Rust WebSocket通信库holochain_websocket的使用,实现高效分布式应用网络通信与数据同步

Rust WebSocket通信库holochain_websocket的使用,实现高效分布式应用网络通信与数据同步

holochain_websocket是Holochain用于WebSocket服务和连接的实用工具库。

基本使用

要建立出站连接,可以使用connect函数,它会返回一个元组(WebsocketSender, WebsocketReceiver)。

要打开监听套接字,可以使用WebsocketListener::bind,它会返回一个WebsocketListener,这是一个异步Stream,其项目解析为相同的元组(WebsocketSender, WebsocketReceiver)。

如果需要能够关闭流,可以使用WebsocketListener::bind_with_handle,它会返回一个元组(ListenerHandle, ListenerStream)。可以使用ListenerHandle::close立即关闭或ListenerHandle::close_on在future完成时关闭。

示例代码

use holochain_serialized_bytes::prelude::*;
use holochain_websocket::*;

use std::convert::TryInto;
use tokio_stream::StreamExt;
use url2::prelude::*;

#[derive(serde::Serialize, serde::Deserialize, SerializedBytes, Debug)]
struct TestMessage(pub String);

// 创建一个新的服务器监听连接
let mut server = WebsocketListener::bind(
    url2!("ws://127.0.0.1:0"),
    std::sync::Arc::new(WebsocketConfig::default()),
)
.await
.unwrap();

// 获取服务器地址
let binding = server.local_addr().clone();

tokio::task::spawn(async move {
    // 处理新连接
    while let Some(Ok((_send, mut recv))) = server.next().await {
        tokio::task::spawn(async move {
            // 接收消息并回显
            if let Some((msg, resp)) = recv.next().await {
                // 反序列化消息
                let msg: TestMessage = msg.try_into().unwrap();
                // 如果这是请求则可以响应
                if resp.is_request() {
                    let msg = TestMessage(format!("echo: {}", msg.0));
                    resp.respond(msg.try_into().unwrap()).await.unwrap();
                }
            }
        });
    }
});

// 将客户端连接到服务器
let (mut send, _recv) = connect(binding, std::sync::Arc::new(WebsocketConfig::default()))
    .await
    .unwrap();

let msg = TestMessage("test".to_string());
// 发出请求并获取回显响应
let rsp: TestMessage = send.request(msg).await.unwrap();

assert_eq!("echo: test", &rsp.0,);

完整示例

下面是一个更完整的WebSocket通信示例,展示了如何在分布式应用中使用holochain_websocket进行网络通信和数据同步:

use holochain_serialized_bytes::prelude::*;
use holochain_websocket::*;
use std::convert::TryInto;
use tokio_stream::StreamExt;
use url2::prelude::*;

// 定义消息类型
#[derive(serde::Serialize, serde::Deserialize, SerializedBytes, Debug, Clone)]
struct DistributedMessage {
    pub node_id: String,
    pub payload: Vec<u8>,
    pub timestamp: u64,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 创建WebSocket服务器
    let mut listener = WebsocketListener::bind(
        url2!("ws://127.0.0.1:0"),
        std::sync::Arc::new(WebsocketConfig::default()),
    )
    .await?;

    let server_addr = listener.local_addr().clone();

    // 启动服务器任务
    tokio::spawn(async move {
        while let Some(Ok((send, mut recv))) = listener.next().await {
            let mut sender = send.clone();
            tokio::spawn(async move {
                while let Some((msg, resp)) = recv.next().await {
                    // 处理接收到的消息
                    if let Ok(dist_msg) = msg.try_into::<DistributedMessage>() {
                        println!("Received message from node {}: {:?}", 
                            dist_msg.node_id, dist_msg.payload);
                        
                        // 如果是请求则响应
                        if resp.is_request() {
                            let ack_msg = DistributedMessage {
                                node_id: "server".to_string(),
                                payload: b"ACK".to_vec(),
                                timestamp: std::time::SystemTime::now()
                                    .duration_since(std::time::UNIX_EPOCH)
                                    .unwrap()
                                    .as_secs(),
                            };
                            resp.respond(ack_msg.try_into().unwrap()).await.unwrap();
                        }
                        
                        // 广播消息给其他节点(简化示例)
                        // 实际应用中需要维护连接列表
                        let _ = sender.send(msg).await;
                    }
                }
            });
        }
    });

    // 创建客户端连接
    let (mut client_send, mut client_recv) = connect(
        server_addr,
        std::sync::Arc::new(WebsocketConfig::default()),
    )
    .await?;

    // 发送测试消息
    let test_msg = DistributedMessage {
        node_id: "client1".to_string(),
        payload: b"Hello Distributed World!".to_vec(),
        timestamp: std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs(),
    };

    // 发送请求并等待响应
    let response: DistributedMessage = client_send
        .request(test_msg.try_into()?)
        .await?;
    
    println!("Received ACK from server: {:?}", response);

    // 接收广播消息
    while let Some(msg) = client_recv.next().await {
        if let Ok(dist_msg) = msg.try_into::<DistributedMessage>() {
            println!("Received broadcast: {:?}", dist_msg);
        }
    }

    Ok(())
}

贡献

Holochain是一个开源项目。我们欢迎各种形式的参与,并积极努力扩大接受参与的范围。

许可证

Apache-2.0

版权所有 © 2019 - 2024, Holochain Foundation


1 回复

Rust WebSocket通信库holochain_websocket使用指南

概述

holochain_websocket是一个用于构建高效分布式应用的Rust WebSocket通信库,特别适合需要实现网络通信和数据同步的场景。它为Holochain框架提供了WebSocket通信能力,但也可以独立使用。

主要特性

  • 轻量级WebSocket通信实现
  • 支持分布式应用网络
  • 高效的数据同步机制
  • 与Holochain框架无缝集成
  • 提供消息路由和广播功能

安装方法

在Cargo.toml中添加依赖:

[dependencies]
holochain_websocket = "0.0.10"
tokio = { version = "1", features = ["full"] }  # 添加tokio依赖

完整示例:带心跳检测的WebSocket服务器和客户端

WebSocket服务器示例

use holochain_websocket::{WebsocketServer, WebsocketSender};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 绑定到本地地址
    let addr = "127.0.0.1:9000".parse().unwrap();
    let server = WebsocketServer::bind(addr).await.unwrap();
    println!("WebSocket服务器启动在: {}", addr);

    // 使用Arc和Mutex来安全地共享客户端连接
    let clients: Arc<Mutex<HashMap<usize, WebsocketSender>>> = Arc::new(Mutex::new(HashMap::new()));

    // 处理新连接
    let clients_for_accept = clients.clone();
    tokio::spawn(async move {
        let mut id_counter = 0;
        while let Some((id, sender, mut receiver)) = server.accept().await {
            let clients_for_handler = clients_for_accept.clone();
            let sender_clone = sender.clone();

            // 将新客户端添加到连接池
            clients_for_accept.lock().await.insert(id, sender_clone);

            // 为每个客户端生成处理任务
            tokio::spawn(async move {
                println!("客户端 {} 已连接", id);

                // 心跳检测任务
                let heartbeat_sender = sender.clone();
                tokio::spawn(async move {
                    loop {
                        sleep(Duration::from_secs(30)).await;
                        if let Err(e) = heartbeat_sender.send("ping".to_string()).await {
                            println!("心跳发送失败: {}", e);
                            break;
                        }
                    }
                });

                // 消息处理循环
                while let Some(msg) = receiver.recv().await {
                    match msg.as_str() {
                        "pong" => {
                            println!("客户端 {} 心跳响应", id);
                        }
                        _ => {
                            println!("客户端 {} 发送: {}", id, msg);

                            // 广播给其他客户端
                            let mut clients = clients_for_handler.lock().await;
                            for (client_id, client_sender) in clients.iter_mut() {
                                if *client_id != id {
                                    let _ = client_sender.send(format!("客户端{}说: {}", id, msg)).await;
                                }
                            }
                        }
                    }
                }

                // 客户端断开连接时清理资源
                println!("客户端 {} 断开连接", id);
                clients_for_handler.lock().await.remove(&id);
            });

            id_counter += 1;
        }
    });

    // 等待Ctrl+C信号
    tokio::signal::ctrl_c().await.unwrap();
    println!("服务器正在关闭...");
}

WebSocket客户端示例

use holochain_websocket::WebsocketSender;
use tokio::net::TcpStream;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 连接服务器
    let stream = match TcpStream::connect("127.0.0.1:9000").await {
        Ok(s) => s,
        Err(e) => {
            eprintln!("连接服务器失败: {}", e);
            return;
        }
    };

    // 建立WebSocket连接
    let (mut sender, mut receiver) = match WebsocketSender::connect(stream).await {
        Ok((s, r)) => (s, r),
        Err(e) => {
            eprintln!("WebSocket握手失败: {}", e);
            return;
        }
    };

    println!("已连接到服务器");

    // 心跳检测任务
    let mut heartbeat_sender = sender.clone();
    tokio::spawn(async move {
        loop {
            sleep(Duration::from_secs(30)).await;
            if let Err(e) = heartbeat_sender.send("pong".to_string()).await {
                eprintln!("心跳响应失败: {}", e);
                break;
            }
        }
    });

    // 消息发送任务
    let mut message_sender = sender.clone();
    tokio::spawn(async move {
        for i in 1..=5 {
            sleep(Duration::from_secs(1)).await;
            if let Err(e) = message_sender.send(format!("消息 {}", i)).await {
                eprintln!("发送消息失败: {}", e);
                break;
            }
        }
    });

    // 消息接收循环
    while let Some(msg) = receiver.recv().await {
        match msg.as_str() {
            "ping" => {
                if let Err(e) = sender.send("pong".to_string()).await {
                    eprintln!("响应心跳失败: {}", e);
                    break;
                }
            }
            _ => {
                println!("收到消息: {}", msg);
            }
        }
    }

    println!("连接已关闭");
}

完整示例说明

  1. 服务器功能:

    • 监听指定端口接受WebSocket连接
    • 维护所有连接的客户端列表
    • 实现心跳检测机制(每30秒发送ping)
    • 支持消息广播功能
    • 自动清理断开连接的客户端资源
  2. 客户端功能:

    • 连接到指定的WebSocket服务器
    • 实现心跳响应机制
    • 支持发送和接收消息
    • 自动处理服务器ping消息
  3. 扩展功能:

    • 使用Arc和Mutex实现线程安全的连接管理
    • 通过tokio::spawn实现并发处理
    • 完善的错误处理和资源清理

运行步骤

  1. 首先启动服务器程序
  2. 然后启动一个或多个客户端程序
  3. 客户端会自动发送5条测试消息并接收服务器广播
  4. 按下Ctrl+C停止服务器

这个完整示例展示了holochain_websocket库的核心功能,包括连接管理、消息传递、心跳检测和广播功能,可以作为开发分布式WebSocket应用的基础模板。

回到顶部