Rust WebSocket库websocket-base的使用,高效实现WebSocket通信与实时数据传输

Rust WebSocket库websocket-base的使用,高效实现WebSocket通信与实时数据传输

安装

在项目目录中运行以下Cargo命令:

cargo add websocket-base

或者在Cargo.toml中添加以下行:

websocket-base = "0.26.5"

基本使用示例

以下是一个使用websocket-base库实现WebSocket通信的完整示例:

use websocket_base::{ClientBuilder, Message, OwnedMessage};
use std::thread;

// WebSocket客户端示例
fn main() {
    // 创建WebSocket客户端连接
    let client = ClientBuilder::new("ws://echo.websocket.org")
        .unwrap()
        .connect_secure(None)
        .unwrap();

    let (mut receiver, mut sender) = client.split().unwrap();

    // 启动接收消息线程
    let receive_thread = thread::spawn(move || {
        for message in receiver.incoming_messages() {
            match message {
                Ok(OwnedMessage::Text(text)) => {
                    println!("Received text message: {}", text);
                }
                Ok(OwnedMessage::Binary(bin)) => {
                    println!("Received binary message: {:?}", bin);
                }
                Ok(OwnedMessage::Close(_)) => {
                    println!("Connection closed");
                    break;
                }
                Ok(OwnedMessage::Ping(data)) => {
                    println!("Received ping, sending pong");
                    sender.send_message(&Message::pong(data)).unwrap();
                }
                Err(e) => {
                    println!("Error receiving message: {}", e);
                    break;
                }
                _ => {}
            }
        }
    });

    // 发送消息
    sender.send_message(&Message::text("Hello, WebSocket!")).unwrap();
    sender.send_message(&Message::binary(vec![1, 2, 3])).unwrap();
    sender.send_message(&Message::close()).unwrap();

    receive_thread.join().unwrap();
}

服务器端示例

use websocket_base::{Server, Message, OwnedMessage};
use std::thread;

// WebSocket服务器示例
fn main() {
    let server = Server::bind("127.0.0.1:8080").unwrap();

    for connection in server.filter_map(Result::ok) {
        thread::spawn(move || {
            let client = connection.accept().unwrap();
            let (mut sender, mut receiver) = client.split().unwrap();

            for message in receiver.incoming_messages() {
                match message {
                    Ok(OwnedMessage::Text(text)) => {
                        println!("Received text: {}", text);
                        sender.send_message(&Message::text(format!("Echo: {}", text))).unwrap();
                    }
                    Ok(OwnedMessage::Binary(bin)) => {
                        println!("Received binary: {:?}", bin);
                        sender.send_message(&Message::binary(bin)).unwrap();
                    }
                    Ok(OwnedMessage::Close(_)) => {
                        println!("Client disconnected");
                        break;
                    }
                    Ok(OwnedMessage::Ping(data)) => {
                        println!("Received ping, sending pong");
                        sender.send_message(&Message::pong(data)).unwrap();
                    }
                    Err(e) => {
                        println!("Error: {}", e);
                        break;
                    }
                    _ => {}
                }
            }
        });
    }
}

完整示例代码

基于上述内容,下面是一个更完整的WebSocket客户端和服务器交互示例:

客户端完整示例

use websocket_base::{ClientBuilder, Message, OwnedMessage};
use std::thread;
use std::time::Duration;

fn main() {
    // 1. 创建WebSocket客户端连接
    let client = ClientBuilder::new("ws://127.0.0.1:8080")
        .expect("创建客户端失败")
        .connect_secure(None)
        .expect("连接服务器失败");

    // 2. 分离读写通道
    let (mut receiver, mut sender) = client.split().expect("分离通道失败");

    // 3. 接收消息线程
    let receive_thread = thread::spawn(move || {
        for message in receiver.incoming_messages() {
            match message {
                Ok(OwnedMessage::Text(text)) => {
                    println!("[客户端] 收到文本消息: {}", text);
                }
                Ok(OwnedMessage::Binary(bin)) => {
                    println!("[客户端] 收到二进制消息: {:?}", bin);
                }
                Ok(OwnedMessage::Close(_)) => {
                    println!("[客户端] 连接关闭");
                    break;
                }
                Ok(OwnedMessage::Ping(data)) => {
                    println!("[客户端] 收到Ping,发送Pong");
                    sender.send_message(&Message::pong(data)).unwrap();
                }
                Err(e) => {
                    println!("[客户端] 接收消息错误: {}", e);
                    break;
                }
                _ => {}
            }
        }
    });

    // 4. 发送消息
    for i in 0..5 {
        let text = format!("消息 {}", i);
        println!("[客户端] 发送: {}", text);
        sender.send_message(&Message::text(text)).unwrap();
        thread::sleep(Duration::from_secs(1));
    }

    // 5. 关闭连接
    sender.send_message(&Message::close()).unwrap();
    receive_thread.join().unwrap();
}

服务器端完整示例

use websocket_base::{Server, Message, OwnedMessage};
use std::thread;
use std::time::Duration;

fn main() {
    // 1. 绑定服务器地址
    let server = Server::bind("127.0.0.1:8080").expect("绑定地址失败");

    println!("WebSocket服务器启动在 ws://127.0.0.1:8080");

    // 2. 处理每个连接
    for connection in server.filter_map(Result::ok) {
        thread::spawn(move || {
            // 3. 接受连接
            let client = connection.accept().expect("接受连接失败");
            let (mut sender, mut receiver) = client.split().expect("分离通道失败");

            println!("[服务器] 新客户端连接");

            // 4. 处理消息
            for message in receiver.incoming_messages() {
                match message {
                    Ok(OwnedMessage::Text(text)) => {
                        println!("[服务器] 收到文本: {}", text);
                        // 返回带前缀的响应
                        let response = format!("服务器响应: {}", text);
                        sender.send_message(&Message::text(response)).unwrap();
                    }
                    Ok(OwnedMessage::Binary(bin)) => {
                        println!("[服务器] 收到二进制: {:?}", bin);
                        // 原样返回二进制数据
                        sender.send_message(&Message::binary(bin)).unwrap();
                    }
                    Ok(OwnedMessage::Close(_)) => {
                        println!("[服务器] 客户端断开连接");
                        break;
                    }
                    Ok(OwnedMessage::Ping(data)) => {
                        println!("[服务器] 收到Ping,发送Pong");
                        sender.send_message(&Message::pong(data)).unwrap();
                    }
                    Err(e) => {
                        println!("[服务器] 错误: {}", e);
                        break;
                    }
                    _ => {}
                }
            }
        });
    }
}

特性

  • 支持WebSocket协议(RFC 6455)
  • 同时支持客户端和服务器端实现
  • 异步和同步API
  • 支持TLS加密连接
  • 支持文本和二进制消息传输
  • 处理Ping/Pong帧以保持连接活跃

1 回复

Rust WebSocket库websocket-base的使用指南

介绍

websocket-base是一个轻量级、高性能的Rust WebSocket库,专为实现高效的WebSocket通信和实时数据传输而设计。它提供了简洁的API接口,支持WebSocket协议的核心功能,包括消息收发、连接管理和错误处理。

主要特性

  • 支持标准WebSocket协议(RFC 6455)
  • 同步和异步API支持
  • 低延迟、高吞吐量的消息传输
  • 内置消息分帧和组装
  • 支持文本和二进制消息格式
  • 可定制的连接超时和重试机制

安装

在Cargo.toml中添加依赖:

[dependencies]
websocket-base = "0.5"

完整示例代码

1. 完整WebSocket客户端示例

use websocket_base::{ClientBuilder, Message};
use std::time::Duration;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建带有自定义配置的客户端
    let mut client = ClientBuilder::new("ws://echo.websocket.org")?
        .timeout(Duration::from_secs(5))
        .connect()?;
    
    // 发送文本消息
    client.send(Message::text("Hello, WebSocket!"))?;
    
    // 发送二进制消息
    let binary_data = vec![0x01, 0x02, 0x03, 0x04];
    client.send(Message::binary(binary_data))?;
    
    // 接收和处理消息
    for _ in 0..2 {
        match client.recv()? {
            Message::Text(text) => println!("Received text: {}", text),
            Message::Binary(data) => println!("Received binary: {:?}", data),
            Message::Close(_) => {
                println!("Connection closed by server");
                break;
            }
            _ => println!("Received other frame type"),
        }
    }
    
    // 优雅关闭连接
    client.send(Message::close())?;
    Ok(())
}

2. 完整WebSocket服务器示例

use websocket_base::{Server, Message, Connection};
use std::thread;
use std::sync::Arc;
use std::time::Duration;

// 自定义消息处理器
struct EchoHandler {
    connection_count: Arc<std::sync::atomic::AtomicUsize>,
}

impl EchoHandler {
    fn handle_connection(&self, mut conn: Connection) {
        let conn_id = self.connection_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        println!("New connection #{}", conn_id);
        
        while let Ok(msg) = conn.recv() {
            match msg {
                Message::Text(text) => {
                    println!("[{}] Received text: {}", conn_id, text);
                    let response = format!("Echo #{}: {}", conn_id, text);
                    conn.send(Message::text(response)).unwrap();
                }
                Message::Binary(data) => {
                    println!("[{}] Received binary data ({} bytes)", conn_id, data.len());
                    conn.send(Message::binary(data)).unwrap();
                }
                Message::Close(_) => {
                    println!("[{}] Client closed connection", conn_id);
                    break;
                }
                _ => {}
            }
        }
        
        println!("[{}] Connection closed", conn_id);
    }
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let server = Server::bind("127.0.0.1:8080")?;
    let handler = EchoHandler {
        connection_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
    };
    
    println!("WebSocket server running on ws://127.0.0.1:8080");
    
    for conn in server.listen()? {
        let handler = EchoHandler {
            connection_count: handler.connection_count.clone(),
        };
        thread::spawn(move || {
            handler.handle_connection(conn);
        });
    }
    
    Ok(())
}

3. 完整异步客户端示例

use websocket_base::{AsyncClientBuilder, Message};
use tokio::runtime::Runtime;
use std::time::Duration;

async fn async_client() -> Result<(), Box<dyn std::error::Error>> {
    // 创建异步客户端
    let mut client = AsyncClientBuilder::new("wss://echo.websocket.org")
        .timeout(Duration::from_secs(10))
        .connect()
        .await?;
    
    println!("Async client connected");
    
    // 发送消息
    client.send(Message::text("Async message 1")).await?;
    client.send(Message::text("Async message 2")).await?;
    
    // 接收消息
    for _ in 0..2 {
        if let Message::Text(response) = client.recv().await? {
            println!("Async response: {}", response);
        }
    }
    
    // 发送Ping帧
    client.send(Message::Ping(b"ping".to_vec())).await?;
    
    // 关闭连接
    client.send(Message::close()).await?;
    Ok(())
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let rt = Runtime::new()?;
    rt.block_on(async_client())
}

性能优化建议

  1. 对于高频消息场景,使用二进制格式而非文本格式
  2. 批量处理消息而不是逐条处理
  3. 使用异步API实现高并发
  4. 适当调整缓冲区大小以适应不同消息大小
  5. 重用WebSocket连接而不是频繁创建新连接

错误处理

use websocket_base::{Error, ClientBuilder};
use std::time::Duration;

fn robust_client() -> Result<(), Box<dyn std::error::Error>> {
    let mut retries = 3;
    let mut delay = Duration::from_secs(1);
    
    while retries > 0 {
        match ClientBuilder::new("ws://echo.websocket.org")
            .timeout(Duration::from_secs(5))
            .connect()
        {
            Ok(mut client) => {
                println!("Connected successfully");
                // 业务逻辑
                return Ok(());
            }
            Err(Error::ConnectionFailed(e)) => {
                eprintln!("Connection failed: {}, retries left: {}", e, retries - 1);
                retries -= 1;
                std::thread::sleep(delay);
                delay *= 2;
            }
            Err(e) => {
                eprintln!("Fatal error: {}", e);
                return Err(Box::new(e));
            }
        }
    }
    
    Err("Failed to connect after retries".into())
}

websocket-base库为Rust开发者提供了简单高效的方式来实现WebSocket通信,适用于需要实时数据传输的各种应用场景。

回到顶部