Rust实时通信库rust_engineio的使用,实现WebSocket和Engine.IO协议的高效客户端与服务器交互
以下是关于Rust实时通信库rust_engineio的使用内容,包含WebSocket和Engine.IO协议的高效客户端与服务器交互的完整示例:
Rust-engineio-client
一个用Rust编程语言实现的engine.io客户端。该实现目前支持engine.io协议的第4版修订版。如果您在使用此客户端时遇到任何连接问题,请确保服务器至少使用engine.io协议的第4版修订版。
示例用法
use rust_engineio::{ClientBuilder, Client, packet::{Packet, PacketId}};
use url::Url;
use bytes::Bytes;
// 获取带有`on_open`回调的客户端
let client: Client = ClientBuilder::new(Url::parse("http://localhost:4201").unwrap())
.on_open(|_| println!("Connection opened!"))
.build()
.expect("Connection failed");
// 连接到服务器
client.connect().expect("Connection failed");
// 创建一个数据包,这里是一个消息数据包并发送它
let packet = Packet::new(PacketId::Message, Bytes::from_static(b"Hello World"));
client.emit(packet).expect("Server unreachable");
// 从服务器断开连接
client.disconnect().expect("Disconnect failed")
完整示例代码
下面是一个更完整的示例,展示如何使用rust_engineio库实现WebSocket和Engine.IO协议的客户端与服务器交互:
use rust_engineio::{ClientBuilder, Client, packet::{Packet, PacketId}};
use url::Url;
use bytes::Bytes;
use std::time::Duration;
fn main() {
// 创建客户端连接
let client: Client = ClientBuilder::new(Url::parse("http://localhost:4201").unwrap())
.on_open(|_| println!("连接已建立!"))
.on_close(|_| println!("连接已关闭"))
.on_error(|err| println!("发生错误: {:?}", err))
.on_data(|data| println!("收到数据: {:?}", data))
.reconnect_interval(Duration::from_secs(5)) // 设置重连间隔
.build()
.expect("客户端创建失败");
// 连接到服务器
client.connect().expect("连接失败");
// 发送消息
let message = Packet::new(PacketId::Message, Bytes::from_static(b"Hello Server"));
client.emit(message).expect("消息发送失败");
// 保持连接一段时间
std::thread::sleep(Duration::from_secs(10));
// 断开连接
client.disconnect().expect("断开连接失败");
}
异步版本示例
要使用异步版本,需要在Cargo.toml中添加以下依赖:
[dependencies]
rust-engineio = { version = "0.6.0", features = ["async"] }
tokio = { version = "1.0", features = ["full"] }
然后可以使用以下异步代码:
use rust_engineio::asynchronous::{ClientBuilder, Client};
use url::Url;
use bytes::Bytes;
use std::time::Duration;
#[tokio::main]
async fn main() {
// 创建异步客户端
let client: Client = ClientBuilder::new(Url::parse("http://localhost:4201").unwrap())
.on_open(|_| println!("连接已建立!"))
.on_close(|_| println!("连接已关闭"))
.on_error(|err| println!("发生错误: {:?}", err))
.on_data(|data| println!("收到数据: {:?}", data))
.reconnect_interval(Duration::from_secs(5))
.build()
.await
.expect("客户端创建失败");
// 异步连接
client.connect().await.expect("连接失败");
// 异步发送消息
let message = Packet::new(PacketId::Message, Bytes::from_static(b"Hello Server"));
client.emit(message).await.expect("消息发送失败");
// 保持连接
tokio::time::sleep(Duration::from_secs(10)).await;
// 异步断开连接
client.disconnect().await.expect("断开连接失败");
}
可用连接方法
该库提供了多种连接方法:
build
: 如果允许则建立WebSocket连接,否则回退到轮询。标准配置。build_polling
: 强制使用轮询传输。build_websocket_with_upgrade
: 先用轮询传输建立连接,然后升级到WebSocket传输(如果可能)。build_websocket
: 仅用WebSocket传输建立连接,如果WebSocket不被允许则会崩溃。
当前功能
该实现支持engine.io协议的所有功能,包括:
- 多种传输选项
- 发送engine.io数据包的能力
- 注册常见的engine.io事件回调:
- on_open
- on_close
- on_data
- on_error
- on_packet
还可以通过TlsConnector
传入自定义TLS配置,以及为初始请求提供自定义头信息。
完整示例Demo
以下是一个完整的实时聊天应用示例,结合了同步和异步版本的最佳实践:
// 同步版本完整示例
use rust_engineio::{ClientBuilder, Client, packet::{Packet, PacketId}};
use url::Url;
use bytes::Bytes;
use std::time::Duration;
use std::io;
fn main() -> io::Result<()> {
println!("正在启动Engine.IO客户端...");
// 创建客户端连接
let client = ClientBuilder::new(Url::parse("http://localhost:4201").unwrap())
.on_open(|_| println!("✅ 已连接到服务器"))
.on_close(|_| println!("❌ 连接已关闭"))
.on_error(|err| eprintln!("⚠️ 连接错误: {:?}", err))
.on_data(|data| {
if let Packet::Message(msg) = data {
println!("📩 收到消息: {}", String::from_utf8_lossy(&msg));
}
})
.reconnect_interval(Duration::from_secs(3))
.build()?;
// 连接到服务器
client.connect()?;
// 发送欢迎消息
let welcome = Packet::new(PacketId::Message, Bytes::from("用户已加入聊天"));
client.emit(welcome)?;
// 创建消息循环
loop {
let mut input = String::new();
io::stdin().read_line(&mut input)?;
if input.trim().eq_ignore_ascii_case("exit") {
break;
}
let packet = Packet::new(PacketId::Message, Bytes::from(input.trim()));
client.emit(packet)?;
}
// 断开连接
let goodbye = Packet::new(PacketId::Message, Bytes::from("用户已离开聊天"));
client.emit(goodbye)?;
client.disconnect()?;
Ok(())
}
// 异步版本完整示例
use rust_engineio::asynchronous::{ClientBuilder, Client};
use url::Url;
use bytes::Bytes;
use std::time::Duration;
use tokio::io::{self, AsyncBufReadExt, BufReader};
#[tokio::main]
async fn main() -> io::Result<()> {
println!("正在启动异步Engine.IO客户端...");
// 创建异步客户端
let client = ClientBuilder::new(Url::parse("http://localhost:4201").unwrap())
.on_open(|_| println!("✅ 已连接到服务器"))
.on_close(|_| println!("❌ 连接已关闭"))
.on_error(|err| eprintln!("⚠️ 连接错误: {:?}", err))
.on_data(|data| {
if let Packet::Message(msg) = data {
println!("📩 收到消息: {}", String::from_utf8_lossy(&msg));
}
})
.reconnect_interval(Duration::from_secs(3))
.build()
.await?;
// 异步连接
client.connect().await?;
// 发送欢迎消息
let welcome = Packet::new(PacketId::Message, Bytes::from("用户已加入聊天"));
client.emit(welcome).await?;
// 创建异步消息循环
let stdin = BufReader::new(io::stdin());
let mut lines = stdin.lines();
while let Ok(Some(line)) = lines.next_line().await {
if line.trim().eq_ignore_ascii_case("exit") {
break;
}
let packet = Packet::new(PacketId::Message, Bytes::from(line.trim()));
client.emit(packet).await?;
}
// 断开连接
let goodbye = Packet::new(PacketId::Message, Bytes::from("用户已离开聊天"));
client.emit(goodbye).await?;
client.disconnect().await?;
Ok(())
}
1 回复
Rust实时通信库rust_engineio的使用指南
概述
rust_engineio是一个实现了Engine.IO协议的Rust库,支持WebSocket和轮询传输,可用于构建实时通信的客户端和服务器应用。Engine.IO是Socket.IO的基础协议,提供了可靠的实时双向通信能力。
主要特性
- 支持Engine.IO协议v3和v4
- 同时支持WebSocket和HTTP轮询传输
- 异步实现(基于tokio)
- 客户端和服务器端实现
- 消息和二进制数据支持
安装
在Cargo.toml中添加依赖:
[dependencies]
rust_engineio = "0.4"
tokio = { version = "1.0", features = ["full"] }
完整示例代码
服务器端完整示例
use rust_engineio::{Server, ServerBuilder, Socket, Packet};
use futures::StreamExt;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 创建Engine.IO服务器
let mut server = ServerBuilder::new()
.ping_interval(Duration::from_secs(5))
.ping_timeout(Duration::from_secs(10))
.max_payload(1_000_000)
.build();
// 处理新连接
let mut new_connections = server.sockets();
tokio::spawn(async move {
while let Some(socket) = new_connections.next().await {
println!("New client connected: {:?}", socket.id);
handle_connection(socket).await;
}
});
// 启动服务器
server.bind("127.0.0.1:3000").await.unwrap();
println!("Server started on 127.0.0.1:3000");
// 保持服务器运行
loop {
sleep(Duration::from_secs(10)).await;
}
}
async fn handle_connection(socket: Socket) {
let (mut sender, mut receiver) = socket.split();
// 处理接收到的消息
tokio::spawn(async move {
while let Some(packet) = receiver.next().await {
match packet {
Ok(packet) => {
match packet {
Packet::Message(text) => {
println!("Received message: {}", text);
// 处理自定义事件
if text.starts_with("event:") {
let event_data = &text[6..];
let response = format!("event_response:{}", event_data);
sender.send(Packet::Message(response)).await.unwrap();
} else {
// 回声消息
sender.send(Packet::Message(text)).await.unwrap();
}
}
Packet::Binary(data) => {
println!("Received binary data: {:?} bytes", data.len());
// 回声二进制数据
sender.send(Packet::Binary(data)).await.unwrap();
}
_ => {}
}
}
Err(e) => {
eprintln!("Error receiving packet: {}", e);
break;
}
}
}
println!("Client disconnected");
});
}
客户端完整示例
use rust_engineio::{Client, ClientBuilder, Packet};
use futures::StreamExt;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 创建Engine.IO客户端
let mut client = ClientBuilder::new("http://127.0.0.1:3000")
.ping_interval(Duration::from_secs(5))
.ping_timeout(Duration::from_secs(10))
.reconnection(true)
.reconnection_attempts(5)
.reconnection_delay(1000)
.connect()
.await
.expect("Failed to connect to server");
println!("Connected to server");
// 发送文本消息
client.emit("Hello from client!".into()).await.unwrap();
// 发送自定义事件
client.emit("event:custom_event_data".into()).await.unwrap();
// 发送二进制数据
let binary_data = vec![0x01, 0x02, 0x03, 0x04];
client.emit_binary(binary_data.into()).await.unwrap();
// 接收消息
tokio::spawn(async move {
while let Some(packet) = client.next().await {
match packet {
Ok(packet) => {
match packet {
Packet::Message(text) => println!("Received message: {}", text),
Packet::Binary(data) => println!("Received binary: {:?}", data),
_ => {}
}
}
Err(e) => {
eprintln!("Error receiving packet: {}", e);
break;
}
}
}
});
// 保持客户端运行
loop {
sleep(Duration::from_secs(5)).await;
// 定期发送心跳消息
client.emit("heartbeat".into()).await.unwrap();
}
}
配置选项
ServerBuilder和ClientBuilder提供的主要配置选项:
// 服务器配置示例
let server = ServerBuilder::new()
.ping_interval(Duration::from_secs(5)) // 心跳间隔
.ping_timeout(Duration::from_secs(10)) // 心跳超时
.max_payload(1_000_000) // 最大负载大小(字节)
.allow_upgrades(true) // 是否允许传输升级
.build();
// 客户端配置示例
let client = ClientBuilder::new("http://127.0.0.1:3000")
.reconnection(true) // 是否自动重连
.reconnection_attempts(5) // 最大重连尝试次数
.reconnection_delay(1000) // 重连延迟(毫秒)
.build();
错误处理建议
// 连接错误处理
match client.connect().await {
Ok(client) => {
// 连接成功处理
}
Err(e) => {
eprintln!("Connection failed: {}", e);
// 实现重连逻辑
}
}
// 消息发送错误处理
if let Err(e) = client.emit("message".into()).await {
eprintln!("Failed to send message: {}", e);
}
性能优化建议
- 对于高吞吐量场景,优先使用WebSocket传输模式
- 合理设置心跳间隔(通常5-25秒)和超时时间(心跳间隔的2-3倍)
- 对于大量小消息,考虑批量发送以减少协议开销
- 二进制数据比文本数据更高效,特别是对于非文本内容
- 在高并发场景下,考虑使用连接池管理客户端连接