Rust WebSocket库websocket-base的使用,高效实现WebSocket通信与实时数据传输
Rust WebSocket库websocket-base的使用,高效实现WebSocket通信与实时数据传输
安装
在项目目录中运行以下Cargo命令:
cargo add websocket-base
或者在Cargo.toml中添加以下行:
websocket-base = "0.26.5"
基本使用示例
以下是一个使用websocket-base库实现WebSocket通信的完整示例:
use websocket_base::{ClientBuilder, Message, OwnedMessage};
use std::thread;
// WebSocket客户端示例
fn main() {
// 创建WebSocket客户端连接
let client = ClientBuilder::new("ws://echo.websocket.org")
.unwrap()
.connect_secure(None)
.unwrap();
let (mut receiver, mut sender) = client.split().unwrap();
// 启动接收消息线程
let receive_thread = thread::spawn(move || {
for message in receiver.incoming_messages() {
match message {
Ok(OwnedMessage::Text(text)) => {
println!("Received text message: {}", text);
}
Ok(OwnedMessage::Binary(bin)) => {
println!("Received binary message: {:?}", bin);
}
Ok(OwnedMessage::Close(_)) => {
println!("Connection closed");
break;
}
Ok(OwnedMessage::Ping(data)) => {
println!("Received ping, sending pong");
sender.send_message(&Message::pong(data)).unwrap();
}
Err(e) => {
println!("Error receiving message: {}", e);
break;
}
_ => {}
}
}
});
// 发送消息
sender.send_message(&Message::text("Hello, WebSocket!")).unwrap();
sender.send_message(&Message::binary(vec![1, 2, 3])).unwrap();
sender.send_message(&Message::close()).unwrap();
receive_thread.join().unwrap();
}
服务器端示例
use websocket_base::{Server, Message, OwnedMessage};
use std::thread;
// WebSocket服务器示例
fn main() {
let server = Server::bind("127.0.0.1:8080").unwrap();
for connection in server.filter_map(Result::ok) {
thread::spawn(move || {
let client = connection.accept().unwrap();
let (mut sender, mut receiver) = client.split().unwrap();
for message in receiver.incoming_messages() {
match message {
Ok(OwnedMessage::Text(text)) => {
println!("Received text: {}", text);
sender.send_message(&Message::text(format!("Echo: {}", text))).unwrap();
}
Ok(OwnedMessage::Binary(bin)) => {
println!("Received binary: {:?}", bin);
sender.send_message(&Message::binary(bin)).unwrap();
}
Ok(OwnedMessage::Close(_)) => {
println!("Client disconnected");
break;
}
Ok(OwnedMessage::Ping(data)) => {
println!("Received ping, sending pong");
sender.send_message(&Message::pong(data)).unwrap();
}
Err(e) => {
println!("Error: {}", e);
break;
}
_ => {}
}
}
});
}
}
完整示例代码
基于上述内容,下面是一个更完整的WebSocket客户端和服务器交互示例:
客户端完整示例
use websocket_base::{ClientBuilder, Message, OwnedMessage};
use std::thread;
use std::time::Duration;
fn main() {
// 1. 创建WebSocket客户端连接
let client = ClientBuilder::new("ws://127.0.0.1:8080")
.expect("创建客户端失败")
.connect_secure(None)
.expect("连接服务器失败");
// 2. 分离读写通道
let (mut receiver, mut sender) = client.split().expect("分离通道失败");
// 3. 接收消息线程
let receive_thread = thread::spawn(move || {
for message in receiver.incoming_messages() {
match message {
Ok(OwnedMessage::Text(text)) => {
println!("[客户端] 收到文本消息: {}", text);
}
Ok(OwnedMessage::Binary(bin)) => {
println!("[客户端] 收到二进制消息: {:?}", bin);
}
Ok(OwnedMessage::Close(_)) => {
println!("[客户端] 连接关闭");
break;
}
Ok(OwnedMessage::Ping(data)) => {
println!("[客户端] 收到Ping,发送Pong");
sender.send_message(&Message::pong(data)).unwrap();
}
Err(e) => {
println!("[客户端] 接收消息错误: {}", e);
break;
}
_ => {}
}
}
});
// 4. 发送消息
for i in 0..5 {
let text = format!("消息 {}", i);
println!("[客户端] 发送: {}", text);
sender.send_message(&Message::text(text)).unwrap();
thread::sleep(Duration::from_secs(1));
}
// 5. 关闭连接
sender.send_message(&Message::close()).unwrap();
receive_thread.join().unwrap();
}
服务器端完整示例
use websocket_base::{Server, Message, OwnedMessage};
use std::thread;
use std::time::Duration;
fn main() {
// 1. 绑定服务器地址
let server = Server::bind("127.0.0.1:8080").expect("绑定地址失败");
println!("WebSocket服务器启动在 ws://127.0.0.1:8080");
// 2. 处理每个连接
for connection in server.filter_map(Result::ok) {
thread::spawn(move || {
// 3. 接受连接
let client = connection.accept().expect("接受连接失败");
let (mut sender, mut receiver) = client.split().expect("分离通道失败");
println!("[服务器] 新客户端连接");
// 4. 处理消息
for message in receiver.incoming_messages() {
match message {
Ok(OwnedMessage::Text(text)) => {
println!("[服务器] 收到文本: {}", text);
// 返回带前缀的响应
let response = format!("服务器响应: {}", text);
sender.send_message(&Message::text(response)).unwrap();
}
Ok(OwnedMessage::Binary(bin)) => {
println!("[服务器] 收到二进制: {:?}", bin);
// 原样返回二进制数据
sender.send_message(&Message::binary(bin)).unwrap();
}
Ok(OwnedMessage::Close(_)) => {
println!("[服务器] 客户端断开连接");
break;
}
Ok(OwnedMessage::Ping(data)) => {
println!("[服务器] 收到Ping,发送Pong");
sender.send_message(&Message::pong(data)).unwrap();
}
Err(e) => {
println!("[服务器] 错误: {}", e);
break;
}
_ => {}
}
}
});
}
}
特性
- 支持WebSocket协议(RFC 6455)
- 同时支持客户端和服务器端实现
- 异步和同步API
- 支持TLS加密连接
- 支持文本和二进制消息传输
- 处理Ping/Pong帧以保持连接活跃
1 回复
Rust WebSocket库websocket-base的使用指南
介绍
websocket-base是一个轻量级、高性能的Rust WebSocket库,专为实现高效的WebSocket通信和实时数据传输而设计。它提供了简洁的API接口,支持WebSocket协议的核心功能,包括消息收发、连接管理和错误处理。
主要特性
- 支持标准WebSocket协议(RFC 6455)
- 同步和异步API支持
- 低延迟、高吞吐量的消息传输
- 内置消息分帧和组装
- 支持文本和二进制消息格式
- 可定制的连接超时和重试机制
安装
在Cargo.toml中添加依赖:
[dependencies]
websocket-base = "0.5"
完整示例代码
1. 完整WebSocket客户端示例
use websocket_base::{ClientBuilder, Message};
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建带有自定义配置的客户端
let mut client = ClientBuilder::new("ws://echo.websocket.org")?
.timeout(Duration::from_secs(5))
.connect()?;
// 发送文本消息
client.send(Message::text("Hello, WebSocket!"))?;
// 发送二进制消息
let binary_data = vec![0x01, 0x02, 0x03, 0x04];
client.send(Message::binary(binary_data))?;
// 接收和处理消息
for _ in 0..2 {
match client.recv()? {
Message::Text(text) => println!("Received text: {}", text),
Message::Binary(data) => println!("Received binary: {:?}", data),
Message::Close(_) => {
println!("Connection closed by server");
break;
}
_ => println!("Received other frame type"),
}
}
// 优雅关闭连接
client.send(Message::close())?;
Ok(())
}
2. 完整WebSocket服务器示例
use websocket_base::{Server, Message, Connection};
use std::thread;
use std::sync::Arc;
use std::time::Duration;
// 自定义消息处理器
struct EchoHandler {
connection_count: Arc<std::sync::atomic::AtomicUsize>,
}
impl EchoHandler {
fn handle_connection(&self, mut conn: Connection) {
let conn_id = self.connection_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
println!("New connection #{}", conn_id);
while let Ok(msg) = conn.recv() {
match msg {
Message::Text(text) => {
println!("[{}] Received text: {}", conn_id, text);
let response = format!("Echo #{}: {}", conn_id, text);
conn.send(Message::text(response)).unwrap();
}
Message::Binary(data) => {
println!("[{}] Received binary data ({} bytes)", conn_id, data.len());
conn.send(Message::binary(data)).unwrap();
}
Message::Close(_) => {
println!("[{}] Client closed connection", conn_id);
break;
}
_ => {}
}
}
println!("[{}] Connection closed", conn_id);
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let server = Server::bind("127.0.0.1:8080")?;
let handler = EchoHandler {
connection_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
println!("WebSocket server running on ws://127.0.0.1:8080");
for conn in server.listen()? {
let handler = EchoHandler {
connection_count: handler.connection_count.clone(),
};
thread::spawn(move || {
handler.handle_connection(conn);
});
}
Ok(())
}
3. 完整异步客户端示例
use websocket_base::{AsyncClientBuilder, Message};
use tokio::runtime::Runtime;
use std::time::Duration;
async fn async_client() -> Result<(), Box<dyn std::error::Error>> {
// 创建异步客户端
let mut client = AsyncClientBuilder::new("wss://echo.websocket.org")
.timeout(Duration::from_secs(10))
.connect()
.await?;
println!("Async client connected");
// 发送消息
client.send(Message::text("Async message 1")).await?;
client.send(Message::text("Async message 2")).await?;
// 接收消息
for _ in 0..2 {
if let Message::Text(response) = client.recv().await? {
println!("Async response: {}", response);
}
}
// 发送Ping帧
client.send(Message::Ping(b"ping".to_vec())).await?;
// 关闭连接
client.send(Message::close()).await?;
Ok(())
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let rt = Runtime::new()?;
rt.block_on(async_client())
}
性能优化建议
- 对于高频消息场景,使用二进制格式而非文本格式
- 批量处理消息而不是逐条处理
- 使用异步API实现高并发
- 适当调整缓冲区大小以适应不同消息大小
- 重用WebSocket连接而不是频繁创建新连接
错误处理
use websocket_base::{Error, ClientBuilder};
use std::time::Duration;
fn robust_client() -> Result<(), Box<dyn std::error::Error>> {
let mut retries = 3;
let mut delay = Duration::from_secs(1);
while retries > 0 {
match ClientBuilder::new("ws://echo.websocket.org")
.timeout(Duration::from_secs(5))
.connect()
{
Ok(mut client) => {
println!("Connected successfully");
// 业务逻辑
return Ok(());
}
Err(Error::ConnectionFailed(e)) => {
eprintln!("Connection failed: {}, retries left: {}", e, retries - 1);
retries -= 1;
std::thread::sleep(delay);
delay *= 2;
}
Err(e) => {
eprintln!("Fatal error: {}", e);
return Err(Box::new(e));
}
}
}
Err("Failed to connect after retries".into())
}
websocket-base库为Rust开发者提供了简单高效的方式来实现WebSocket通信,适用于需要实时数据传输的各种应用场景。