Rust WebSocket通信库re_ws_comms的使用,实现高效稳定的实时双向数据传输

Rust WebSocket通信库re_ws_comms的使用,实现高效稳定的实时双向数据传输

关于re_ws_comms

re_ws_comms是rerun系列crate的一部分,它是一个用于Rerun服务器和查看器之间进行WebSocket通信的库,提供编码、解码、客户端和服务器功能。

版本信息:

  • 最新版本:0.22.1
  • 支持Rust版本:v1.81.0
  • 许可证:MIT或Apache-2.0

安装

在项目目录中运行以下Cargo命令:

cargo add re_ws_comms

或者在Cargo.toml中添加:

re_ws_comms = "0.22.1"

示例代码

以下是一个使用re_ws_comms实现WebSocket通信的完整示例:

use re_ws_comms::{Client, Server};
use tokio::sync::mpsc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

// WebSocket服务器示例
async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
    // 创建WebSocket服务器
    let server = Server::bind("127.0.0.1:8080").await?;
    
    // 接受客户端连接
    let (mut ws_stream, _) = server.accept().await?;
    
    // 创建消息通道
    let (tx, mut rx) = mpsc::channel(32);
    
    // 接收消息任务
    let recv_task = tokio::spawn(async move {
        loop {
            match ws_stream.read_message().await {
                Ok(msg) => {
                    println!("服务器收到: {}", msg);
                    // 处理消息逻辑...
                }
                Err(e) => {
                    eprintln!("接收错误: {}", e);
                    break;
                }
            }
        }
    });
    
    // 发送消息任务
    let send_task = tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            if let Err(e) = ws_stream.write_message(&msg).await {
                eprintln!("发送错误: {}", e);
                break;
            }
        }
    });
    
    // 等待任务完成
    tokio::select! {
        _ = recv_task => {},
        _ = send_task => {},
    }
    
    Ok(())
}

// WebSocket客户端示例
async fn run_client() -> Result<(), Box<dyn std::error::Error>> {
    // 连接到服务器
    let mut client = Client::connect("ws://127.0.0.1:8080").await?;
    
    // 创建消息通道
    let (tx, mut rx) = mpsc::channel(32);
    
    // 接收消息任务
    let recv_task = tokio::spawn(async move {
        loop {
            match client.read_message().await {
                Ok(msg) => {
                    println!("客户端收到: {}", msg);
                    // 处理消息逻辑...
                }
                Err(e) => {
                    eprintln!("接收错误: {}", e);
                    break;
                }
            }
        }
    });
    
    // 发送消息任务
    let send_task = tokio::spawn(async move {
        // 示例:发送一些消息
        for i in 0..10 {
            let msg = format!("消息 {}", i);
            if let Err(e) = client.write_message(&msg).await {
                eprintln!("发送错误: {}", e);
                break;
            }
            tokio::time::sleep(std::time::Duration::from_secs(1).await;
        }
    });
    
    // 等待任务完成
    tokio::select! {
        _ = recv_task => {},
        _ = send_task => {},
    }
    
    Ok(())
}

#[tokio::main]
async fn main() {
    // 启动服务器
    let server = tokio::spawn(run_server());
    
    // 等待服务器启动
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    
    // 启动客户端
    let client = tokio::spawn(run_client());
    
    // 等待任务完成
    let _ = tokio::join!(server, client);
}

完整示例代码

use re_ws_comms::{Client, Server};
use tokio::sync::mpsc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

// WebSocket服务器实现
async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
    // 绑定到本地地址8080端口
    let server = Server::bind("127.0.0.1:8080").await?;
    println!("服务器已启动,等待连接...");
    
    // 接受客户端连接
    let (mut ws_stream, _) = server.accept().await?;
    println!("客户端已连接");
    
    // 创建消息通道用于发送消息
    let (tx, mut rx) = mpsc::channel(32);
    
    // 接收消息任务
    let recv_task = tokio::spawn(async move {
        loop {
            match ws_stream.read_message().await {
                Ok(msg) => {
                    println!("[服务器] 收到消息: {}", msg);
                    
                    // 简单回复逻辑
                    if msg == "ping" {
                        if let Err(e) = ws_stream.write_message("pong").await {
                            eprintln!("回复消息失败: {}", e);
                            break;
                        }
                    }
                }
                Err(e) => {
                    eprintln!("接收消息错误: {}", e);
                    break;
                }
            }
        }
    });
    
    // 发送消息任务
    let send_task = tokio::spawn(async move {
        // 发送欢迎消息
        if let Err(e) = ws_stream.write_message("欢迎连接到WebSocket服务器").await {
            eprintln!("发送欢迎消息失败: {}", e);
            return;
        }
        
        // 处理来自其他部分的发送请求
        while let Some(msg) = rx.recv().await {
            if let Err(e) = ws_stream.write_message(&msg).await {
                eprintln!("发送消息失败: {}", e);
                break;
            }
        }
    });
    
    // 等待任务完成
    tokio::select! {
        _ = recv_task => {},
        _ = send_task => {},
    }
    
    Ok(())
}

// WebSocket客户端实现
async fn run_client() -> Result<(), Box<dyn std::error::Error>> {
    // 连接到服务器
    let mut client = Client::connect("ws://127.0.0.1:8080").await?;
    println!("已连接到服务器");
    
    // 创建消息通道
    let (tx, mut rx) = mpsc::channel(32);
    
    // 接收消息任务
    let recv_task = tokio::spawn(async move {
        loop {
            match client.read_message().await {
                Ok(msg) => {
                    println!("[客户端] 收到消息: {}", msg);
                }
                Err(e) => {
                    eprintln!("接收消息错误: {}", e);
                    break;
                }
            }
        }
    });
    
    // 发送消息任务
    let send_task = tokio::spawn(async move {
        // 发送测试消息
        for i in 0..5 {
            let msg = format!("测试消息 {}", i);
            if let Err(e) = client.write_message(&msg).await {
                eprintln!("发送消息失败: {}", e);
                break;
            }
            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        }
        
        // 发送ping消息测试回复
        if let Err(e) = client.write_message("ping").await {
            eprintln!("发送ping失败: {}", e);
        }
    });
    
    // 等待任务完成
    tokio::select! {
        _ = recv_task => {},
        _ = send_task => {},
    }
    
    Ok(())
}

#[tokio::main]
async fn main() {
    // 启动服务器
    let server = tokio::spawn(run_server());
    
    // 等待服务器启动
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    
    // 启动客户端
    let client = tokio::spawn(run_client());
    
    // 等待任务完成
    let _ = tokio::join!(server, client);
}

关键特性

  1. 双向通信:支持客户端和服务器之间的双向消息传递
  2. 异步支持:基于tokio的异步实现
  3. 稳定性:提供稳定的连接和消息传输
  4. 简单API:易于使用的接口设计

使用场景

  • 实时数据可视化
  • 远程监控和控制
  • 多设备间实时通信
  • 游戏和交互式应用

通过re_ws_comms,开发者可以轻松实现Rust应用中的高效WebSocket通信,构建实时双向数据传输系统。


1 回复

Rust WebSocket通信库re_ws_comms使用指南

简介

re_ws_comms是一个高效的Rust WebSocket通信库,专门设计用于实现稳定可靠的实时双向数据传输。它提供了简洁的API和强大的功能,适用于需要实时通信的各种应用场景。

主要特性

  • 高性能的WebSocket实现
  • 支持双向通信
  • 自动重连机制
  • 消息序列化/反序列化支持
  • 线程安全设计
  • 可定制的错误处理

安装

在Cargo.toml中添加依赖:

[dependencies]
re_ws_comms = "0.1"  # 请使用最新版本号
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

完整示例代码

WebSocket客户端和服务器完整示例

服务器端代码

use re_ws_comms::Server;
use tokio::sync::mpsc;
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize, Debug)]
struct ChatMessage {
    username: String,
    content: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建WebSocket服务器,监听8080端口
    let server = Server::new("127.0.0.1:8080").await?;
    println!("WebSocket服务器已启动,监听127.0.0.1:8080");
    
    while let Ok((mut connection, addr)) = server.accept().await {
        println!("收到来自 {} 的新连接", addr);
        let (tx, mut rx) = mpsc::channel(32);
        
        tokio::spawn(async move {
            // 处理接收到的消息
            while let Some(message) = rx.recv().await {
                // 尝试反序列化为ChatMessage
                if let Ok(chat_msg) = serde_json::from_str::<ChatMessage>(&message) {
                    println!("收到来自 {} 的消息: {}", chat_msg.username, chat_msg.content);
                    // 回复客户端
                    let response = format!("已收到你的消息: {}", chat_msg.content);
                    connection.send(response).await.unwrap();
                } else {
                    println!("收到原始消息: {}", message);
                    // 回声服务器
                    connection.send(format!("Echo: {}", message)).await.unwrap();
                }
            }
        });
    }
    
    Ok(())
}

客户端代码

use re_ws_comms::Client;
use tokio::sync::mpsc;
use serde::{Serialize, Deserialize};
use std::time::Duration;
use tokio::time;

#[derive(Serialize, Deserialize, Debug)]
struct ChatMessage {
    username: String,
    content: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建消息通道
    let (tx, mut rx) = mpsc::channel(32);
    
    // 配置自动重连选项
    let reconnect_options = re_ws_comms::ReconnectOptions {
        max_retries: 5,
        retry_delay: Duration::from_secs(2),
        backoff_multiplier: 1.5,
    };
    
    // 连接WebSocket服务器
    let mut client = Client::connect_with_reconnect(
        "ws://127.0.0.1:8080", 
        tx, 
        reconnect_options
    ).await?;
    
    println!("已连接到WebSocket服务器");
    
    // 启动消息接收处理线程
    tokio::spawn(async move {
        while let Some(message) = rx.recv().await {
            println!("收到服务器响应: {}", message);
        }
    });
    
    // 发送简单文本消息
    client.send("Hello, WebSocket!").await?;
    
    // 发送自定义结构体消息
    let chat_msg = ChatMessage {
        username: "Rust用户".to_string(),
        content: "这是通过WebSocket发送的结构化消息".to_string(),
    };
    client.send_serialized(&chat_msg).await?;
    
    // 发送二进制数据
    let binary_data = vec![0x01, 0x02, 0x03, 0x04];
    client.send_binary(binary_data).await?;
    
    // 保持连接并定时发送消息
    let mut interval = time::interval(Duration::from_secs(5));
    for _ in 0..3 {
        interval.tick().await;
        client.send("定时消息").await?;
    }
    
    Ok(())
}

示例说明

  1. 服务器端

    • 创建WebSocket服务器监听8080端口
    • 接受客户端连接并处理消息
    • 支持处理结构化消息和原始文本消息
    • 对收到的消息进行回声或响应
  2. 客户端

    • 连接到WebSocket服务器
    • 实现自动重连功能
    • 支持发送三种类型消息:文本、结构化数据和二进制数据
    • 单独线程处理接收到的服务器响应
    • 包含定时发送消息的示例
  3. 自定义消息

    • 使用serde进行序列化/反序列化
    • 展示如何发送和接收结构化数据

性能优化建议

  1. 对于高频消息,考虑使用二进制协议而非文本
  2. 批量处理消息以减少系统调用
  3. 使用连接池管理多个WebSocket连接
  4. 根据应用场景调整缓冲区大小

re_ws_comms库为Rust开发者提供了构建高效实时应用的强大工具,其简洁的API和可靠的设计使其成为WebSocket通信的优秀选择。

回到顶部