Rust WebSocket通信库holochain_websocket的使用,实现高效分布式应用网络通信与数据同步
Rust WebSocket通信库holochain_websocket的使用,实现高效分布式应用网络通信与数据同步
holochain_websocket是Holochain用于WebSocket服务和连接的实用工具库。
基本使用
要建立出站连接,可以使用connect
函数,它会返回一个元组(WebsocketSender
, WebsocketReceiver
)。
要打开监听套接字,可以使用WebsocketListener::bind
,它会返回一个WebsocketListener
,这是一个异步Stream,其项目解析为相同的元组(WebsocketSender
, WebsocketReceiver
)。
如果需要能够关闭流,可以使用WebsocketListener::bind_with_handle
,它会返回一个元组(ListenerHandle
, ListenerStream
)。可以使用ListenerHandle::close
立即关闭或ListenerHandle::close_on
在future完成时关闭。
示例代码
use holochain_serialized_bytes::prelude::*;
use holochain_websocket::*;
use std::convert::TryInto;
use tokio_stream::StreamExt;
use url2::prelude::*;
#[derive(serde::Serialize, serde::Deserialize, SerializedBytes, Debug)]
struct TestMessage(pub String);
// 创建一个新的服务器监听连接
let mut server = WebsocketListener::bind(
url2!("ws://127.0.0.1:0"),
std::sync::Arc::new(WebsocketConfig::default()),
)
.await
.unwrap();
// 获取服务器地址
let binding = server.local_addr().clone();
tokio::task::spawn(async move {
// 处理新连接
while let Some(Ok((_send, mut recv))) = server.next().await {
tokio::task::spawn(async move {
// 接收消息并回显
if let Some((msg, resp)) = recv.next().await {
// 反序列化消息
let msg: TestMessage = msg.try_into().unwrap();
// 如果这是请求则可以响应
if resp.is_request() {
let msg = TestMessage(format!("echo: {}", msg.0));
resp.respond(msg.try_into().unwrap()).await.unwrap();
}
}
});
}
});
// 将客户端连接到服务器
let (mut send, _recv) = connect(binding, std::sync::Arc::new(WebsocketConfig::default()))
.await
.unwrap();
let msg = TestMessage("test".to_string());
// 发出请求并获取回显响应
let rsp: TestMessage = send.request(msg).await.unwrap();
assert_eq!("echo: test", &rsp.0,);
完整示例
下面是一个更完整的WebSocket通信示例,展示了如何在分布式应用中使用holochain_websocket进行网络通信和数据同步:
use holochain_serialized_bytes::prelude::*;
use holochain_websocket::*;
use std::convert::TryInto;
use tokio_stream::StreamExt;
use url2::prelude::*;
// 定义消息类型
#[derive(serde::Serialize, serde::Deserialize, SerializedBytes, Debug, Clone)]
struct DistributedMessage {
pub node_id: String,
pub payload: Vec<u8>,
pub timestamp: u64,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 创建WebSocket服务器
let mut listener = WebsocketListener::bind(
url2!("ws://127.0.0.1:0"),
std::sync::Arc::new(WebsocketConfig::default()),
)
.await?;
let server_addr = listener.local_addr().clone();
// 启动服务器任务
tokio::spawn(async move {
while let Some(Ok((send, mut recv))) = listener.next().await {
let mut sender = send.clone();
tokio::spawn(async move {
while let Some((msg, resp)) = recv.next().await {
// 处理接收到的消息
if let Ok(dist_msg) = msg.try_into::<DistributedMessage>() {
println!("Received message from node {}: {:?}",
dist_msg.node_id, dist_msg.payload);
// 如果是请求则响应
if resp.is_request() {
let ack_msg = DistributedMessage {
node_id: "server".to_string(),
payload: b"ACK".to_vec(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
resp.respond(ack_msg.try_into().unwrap()).await.unwrap();
}
// 广播消息给其他节点(简化示例)
// 实际应用中需要维护连接列表
let _ = sender.send(msg).await;
}
}
});
}
});
// 创建客户端连接
let (mut client_send, mut client_recv) = connect(
server_addr,
std::sync::Arc::new(WebsocketConfig::default()),
)
.await?;
// 发送测试消息
let test_msg = DistributedMessage {
node_id: "client1".to_string(),
payload: b"Hello Distributed World!".to_vec(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
// 发送请求并等待响应
let response: DistributedMessage = client_send
.request(test_msg.try_into()?)
.await?;
println!("Received ACK from server: {:?}", response);
// 接收广播消息
while let Some(msg) = client_recv.next().await {
if let Ok(dist_msg) = msg.try_into::<DistributedMessage>() {
println!("Received broadcast: {:?}", dist_msg);
}
}
Ok(())
}
贡献
Holochain是一个开源项目。我们欢迎各种形式的参与,并积极努力扩大接受参与的范围。
许可证
Apache-2.0
版权所有 © 2019 - 2024, Holochain Foundation
Rust WebSocket通信库holochain_websocket使用指南
概述
holochain_websocket是一个用于构建高效分布式应用的Rust WebSocket通信库,特别适合需要实现网络通信和数据同步的场景。它为Holochain框架提供了WebSocket通信能力,但也可以独立使用。
主要特性
- 轻量级WebSocket通信实现
- 支持分布式应用网络
- 高效的数据同步机制
- 与Holochain框架无缝集成
- 提供消息路由和广播功能
安装方法
在Cargo.toml中添加依赖:
[dependencies]
holochain_websocket = "0.0.10"
tokio = { version = "1", features = ["full"] } # 添加tokio依赖
完整示例:带心跳检测的WebSocket服务器和客户端
WebSocket服务器示例
use holochain_websocket::{WebsocketServer, WebsocketSender};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 绑定到本地地址
let addr = "127.0.0.1:9000".parse().unwrap();
let server = WebsocketServer::bind(addr).await.unwrap();
println!("WebSocket服务器启动在: {}", addr);
// 使用Arc和Mutex来安全地共享客户端连接
let clients: Arc<Mutex<HashMap<usize, WebsocketSender>>> = Arc::new(Mutex::new(HashMap::new()));
// 处理新连接
let clients_for_accept = clients.clone();
tokio::spawn(async move {
let mut id_counter = 0;
while let Some((id, sender, mut receiver)) = server.accept().await {
let clients_for_handler = clients_for_accept.clone();
let sender_clone = sender.clone();
// 将新客户端添加到连接池
clients_for_accept.lock().await.insert(id, sender_clone);
// 为每个客户端生成处理任务
tokio::spawn(async move {
println!("客户端 {} 已连接", id);
// 心跳检测任务
let heartbeat_sender = sender.clone();
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(30)).await;
if let Err(e) = heartbeat_sender.send("ping".to_string()).await {
println!("心跳发送失败: {}", e);
break;
}
}
});
// 消息处理循环
while let Some(msg) = receiver.recv().await {
match msg.as_str() {
"pong" => {
println!("客户端 {} 心跳响应", id);
}
_ => {
println!("客户端 {} 发送: {}", id, msg);
// 广播给其他客户端
let mut clients = clients_for_handler.lock().await;
for (client_id, client_sender) in clients.iter_mut() {
if *client_id != id {
let _ = client_sender.send(format!("客户端{}说: {}", id, msg)).await;
}
}
}
}
}
// 客户端断开连接时清理资源
println!("客户端 {} 断开连接", id);
clients_for_handler.lock().await.remove(&id);
});
id_counter += 1;
}
});
// 等待Ctrl+C信号
tokio::signal::ctrl_c().await.unwrap();
println!("服务器正在关闭...");
}
WebSocket客户端示例
use holochain_websocket::WebsocketSender;
use tokio::net::TcpStream;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 连接服务器
let stream = match TcpStream::connect("127.0.0.1:9000").await {
Ok(s) => s,
Err(e) => {
eprintln!("连接服务器失败: {}", e);
return;
}
};
// 建立WebSocket连接
let (mut sender, mut receiver) = match WebsocketSender::connect(stream).await {
Ok((s, r)) => (s, r),
Err(e) => {
eprintln!("WebSocket握手失败: {}", e);
return;
}
};
println!("已连接到服务器");
// 心跳检测任务
let mut heartbeat_sender = sender.clone();
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(30)).await;
if let Err(e) = heartbeat_sender.send("pong".to_string()).await {
eprintln!("心跳响应失败: {}", e);
break;
}
}
});
// 消息发送任务
let mut message_sender = sender.clone();
tokio::spawn(async move {
for i in 1..=5 {
sleep(Duration::from_secs(1)).await;
if let Err(e) = message_sender.send(format!("消息 {}", i)).await {
eprintln!("发送消息失败: {}", e);
break;
}
}
});
// 消息接收循环
while let Some(msg) = receiver.recv().await {
match msg.as_str() {
"ping" => {
if let Err(e) = sender.send("pong".to_string()).await {
eprintln!("响应心跳失败: {}", e);
break;
}
}
_ => {
println!("收到消息: {}", msg);
}
}
}
println!("连接已关闭");
}
完整示例说明
-
服务器功能:
- 监听指定端口接受WebSocket连接
- 维护所有连接的客户端列表
- 实现心跳检测机制(每30秒发送ping)
- 支持消息广播功能
- 自动清理断开连接的客户端资源
-
客户端功能:
- 连接到指定的WebSocket服务器
- 实现心跳响应机制
- 支持发送和接收消息
- 自动处理服务器ping消息
-
扩展功能:
- 使用Arc和Mutex实现线程安全的连接管理
- 通过tokio::spawn实现并发处理
- 完善的错误处理和资源清理
运行步骤
- 首先启动服务器程序
- 然后启动一个或多个客户端程序
- 客户端会自动发送5条测试消息并接收服务器广播
- 按下Ctrl+C停止服务器
这个完整示例展示了holochain_websocket库的核心功能,包括连接管理、消息传递、心跳检测和广播功能,可以作为开发分布式WebSocket应用的基础模板。