Rust异步WebSocket库async-wsocket的使用:高性能实时通信与双向数据传输

Rust异步WebSocket库async-wsocket的使用:高性能实时通信与双向数据传输

示例代码

use std::time::Duration;

use async_wsocket::{ConnectionMode, Url, WsMessage};
use futures_util::{SinkExt, StreamExt};

const NONCE: u64 = 123456789;

#[tokio::main]
async fn main() {
    // 解析WebSocket URL
    let url = Url::parse("ws://oxtrdevav64z64yb7x6rjg4ntzqjhedm5b5zjqulugknhzr46ny2qbad.onion").unwrap();
    
    // 建立连接,使用Tor模式,超时时间120秒
    let (mut tx, mut rx) = async_wsocket::connect(&url, ConnectionMode::tor(), Duration::from_secs(120))
        .await
        .unwrap();

    // 发送Ping消息
    let nonce = NONCE.to_be_bytes().to_vec();
    tx.send(WsMessage::Ping(nonce.clone())).await.unwrap();

    // 监听消息
    while let Some(msg) = rx.next().await {
        if let Ok(WsMessage::Pong(bytes)) = msg {
            assert_eq!(nonce, bytes);
            println!("Pong match!");
            break;
        }
    }
}

完整示例

以下是一个更完整的WebSocket客户端示例,展示了如何建立连接、发送和接收各种类型的消息:

use std::time::Duration;
use async_wsocket::{ConnectionMode, Url, WsMessage};
use futures_util::{SinkExt, StreamExt};
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. 创建WebSocket连接
    let url = Url::parse("ws://echo.websocket.org")?; // 使用公共测试服务器
    let (mut tx, mut rx) = async_wsocket::connect(
        &url,
        ConnectionMode::Plain, // 普通连接模式
        Duration::from_secs(30) // 超时时间
    ).await?;

    println!("已连接到 {}", url);

    // 2. 发送文本消息
    let text_msg = WsMessage::Text("Hello, WebSocket!".to_string());
    tx.send(text_msg).await?;
    println!("已发送文本消息");

    // 3. 发送二进制消息
    let binary_msg = WsMessage::Binary(vec![1, 2, 3, 4, 5]);
    tx.send(binary_msg).await?;
    println!("已发送二进制消息");

    // 4. 发送Ping并等待Pong
    let ping_data = b"ping data".to_vec();
    tx.send(WsMessage::Ping(ping_data.clone())).await?;
    println!("已发送Ping");

    // 5. 异步接收消息
    tokio::spawn(async move {
        while let Some(msg) = rx.next().await {
            match msg {
                Ok(WsMessage::Text(text)) => println!("收到文本消息: {}", text),
                Ok(WsMessage::Binary(data)) => println!("收到二进制数据: {:?}", data),
                Ok(WsMessage::Pong(data)) => {
                    println!("收到Pong响应,数据: {:?}", data);
                    assert_eq!(data, ping_data);
                }
                Ok(WsMessage::Close(frame)) => {
                    println!("连接已关闭: {:?}", frame);
                    break;
                }
                Err(e) => {
                    eprintln!("接收消息出错: {}", e);
                    break;
                }
                _ => {}
            }
        }
    });

    // 6. 保持连接5秒
    sleep(Duration::from_secs(5)).await;

    // 7. 优雅关闭连接
    tx.send(WsMessage::Close(None)).await?;
    println!("连接已正常关闭");

    Ok(())
}

特性标志

async-wsocket库提供了以下可选特性:

特性 默认启用 功能描述
socks 支持SOCKS代理
tor 内置Tor客户端支持
tor-launch-service 支持启动隐藏的洋葱服务

Rust版本要求

  • 基础功能(默认): Rust 1.63.0+
  • Tor功能: Rust 1.70.0+
  • WASM目标: Rust 1.73.0+

授权许可

本项目采用MIT开源协议发布。


1 回复

以下是基于提供的async-wsocket使用指南整理的完整示例代码及说明:


完整WebSocket客户端+服务器示例

1. WebSocket客户端完整示例

use async_wsocket::{ClientBuilder, Error};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Error> {
    // 1. 创建带心跳的客户端
    let mut client = ClientBuilder::new()
        .url("ws://127.0.0.1:8080")?  // 连接本地服务器
        .heartbeat_interval(Duration::from_secs(15)) // 15秒心跳
        .connect()
        .await?;
    
    // 2. 发送文本消息
    client.send_text("Hello Server!").await?;
    
    // 3. 发送二进制数据
    let binary_data = vec![0xDE, 0xAD, 0xBE, 0xEF];
    client.send_binary(binary_data).await?;

    // 4. 持续接收消息
    loop {
        match client.receive().await? {
            async_wsocket::Message::Text(text) => {
                println!("[Client] Received text: {}", text);
            }
            async_wsocket::Message::Binary(bin) => {
                println!("[Client] Received binary: {:?}", bin);
            }
            async_wsocket::Message::Close(_) => {
                println!("[Client] Connection closed");
                break;
            }
            _ => {}
        }
    }
    
    Ok(())
}

2. WebSocket服务器完整示例

use async_wsocket::{ServerBuilder, Error};
use tokio::sync::broadcast;

#[tokio::main]
async fn main() -> Result<(), Error> {
    // 1. 创建消息广播通道
    let (tx, _) = broadcast::channel(32);
    
    // 2. 启动服务器
    let server = ServerBuilder::new()
        .bind("127.0.0.1:8080")
        .await?;
    
    println!("Server started on ws://127.0.0.1:8080");

    // 3. 接受客户端连接
    while let Ok((mut client, addr)) = server.accept().await {
        let tx = tx.clone();
        let mut rx = tx.subscribe();
        
        tokio::spawn(async move {
            println!("[Server] New client: {}", addr);
            
            // 4. 接收客户端消息
            while let Ok(msg) = client.receive().await {
                match msg {
                    async_wsocket::Message::Text(text) => {
                        println!("[Server] Received from {}: {}", addr, text);
                        // 广播消息给所有客户端
                        let _ = tx.send(format!("[Broadcast] {}", text));
                    }
                    async_wsocket::Message::Binary(bin) => {
                        println!("[Server] Binary from {}: {:?}", addr, bin);
                        // 回传二进制数据
                        client.send_binary(bin).await.ok();
                    }
                    async_wsocket::Message::Close(_) => {
                        println!("[Server] Client disconnected: {}", addr);
                        break;
                    }
                    _ => {}
                }
            }
        });
    }
    
    Ok(())
}

3. 带TLS加密的客户端示例

use async_wsocket::{ClientBuilder, Error};

#[tokio::main]
async fn main() -> Result<(), Error> {
    // 1. 创建TLS加密连接
    let mut client = ClientBuilder::new()
        .url("wss://secure-server.example.com")?
        .connect()
        .await?;
    
    // 2. 安全通信
    client.send_text("Secret Message").await?;
    
    match client.receive().await? {
        async_wsocket::Message::Text(text) => {
            println!("Secure response: {}", text);
        }
        _ => {}
    }
    
    Ok(())
}

关键功能说明

  1. 双工通信:客户端和服务器同时具备发送/接收能力
  2. 消息类型处理
    • 文本消息:Message::Text
    • 二进制消息:Message::Binary
    • 关闭帧:Message::Close
  3. 并发处理:服务器使用tokio::spawn为每个连接创建独立任务
  4. 错误处理:所有操作返回Result类型,需处理潜在错误

执行流程建议

  1. 先启动服务器程序
  2. 再运行客户端程序
  3. 观察控制台输出的消息交互
  4. 可同时启动多个客户端测试广播功能

注意事项

  1. 测试时请确保端口未被占用
  2. TLS示例需要有效的证书配置
  3. 生产环境建议:
    • 添加更完善的错误处理
    • 实现连接重试机制
    • 使用消息序列化协议(如JSON/Protobuf)
回到顶部