Rust WebSocket协议解析库socketioxide-parser-common的使用,支持高效Socket.IO消息解析与协议处理
Rust WebSocket协议解析库socketioxide-parser-common的使用
Socketioxide是一个Rust实现的Socket.IO服务器,它能与Tower生态系统和Tokio栈集成。它可以与任何基于tower的服务器框架集成,如Axum、Warp、Salvo、Viz或Hyper。
主要特性
- 与多种框架集成:
- Axum、Hyper、Salvo、Viz等
- 支持各种基于tower的中间件:
- CORS、压缩、授权等
- 可扩展的适配器:
- Redis/Valkey、MongoDB等
- 远程集群通信
- 命名空间和动态命名空间
- 房间功能
- 确认和带确认的发送
- 二进制数据包
- 轮询和WebSocket传输
- 常用和Msgpack解析器
- 扩展功能为socket添加自定义数据
- 高效的内存HTTP负载解析
- 类似Axum的API处理事件
- 支持所有Socket.IO版本
示例代码
聊天应用示例
io.ns("/", |s: SocketRef| {
s.on("new message", |s: SocketRef, Data::<String>(msg)| {
let username = s.extensions.get::<Username>().unwrap().clone();
let msg = Res::Message {
username,
message: msg,
};
s.broadcast().emit("new message", msg).ok();
});
s.on(
"add user",
|s: SocketRef, Data::<String>(username), user_cnt: State<UserCnt>| {
if s.extensions.get::<Username>().is_some() {
return;
}
let num_users = user_cnt.add_user();
s.extensions.insert(Username(username.clone()));
s.emit("login", Res::Login { num_users }).ok();
let res = Res::UserEvent {
num_users,
username: Username(username),
};
s.broadcast().emit("user joined", res).ok();
},
);
s.on("typing", |s: SocketRef| {
let username = s.extensions.get::<Username>().unwrap().clone();
s.broadcast()
.emit("typing", Res::Username { username })
.ok();
});
s.on("stop typing", |s: SocketRef| {
let username = s.extensions.get::<Username>().unwrap().clone();
s.broadcast()
.emit("stop typing", Res::Username { username })
.ok();
});
s.on_disconnect(|s: SocketRef, user_cnt: State<UserCnt>| {
if let Some(username) = s.extensions.get::<Username>() {
let num_users = user_cnt.remove_user();
let res = Res::UserEvent {
num_users,
username: username.clone(),
};
s.b广播.emit("user left", res).ok();
}
});
});
Axum实现的Echo示例
use axum::routing::get;
use serde_json::Value;
use socketioxide::{
extract::{AckSender, Bin, Data, SocketRef},
SocketIo,
};
use tracing::info;
use tracing_subscriber::FmtSubscriber;
fn on_connect(socket: SocketRef, Data(data): Data<Value>) {
info!("Socket.IO connected: {:?} {:?}", socket.ns(), socket.id);
socket.emit("auth", data).ok();
socket.on(
"message",
|socket: SocketRef, Data::<Value>(data), Bin(bin)| {
info!("Received event: {:?} {:?}", data, bin);
socket.bin(bin).emit("message-back", data).ok();
},
);
socket.on(
"message-with-ack",
|Data::<Value>(data), ack: AckSender, Bin(bin)| {
info!("Received event: {:?} {:?}", data, bin);
ack.bin(bin).send(data).ok();
},
);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::subscriber::set_global_default(FmtSubscriber::default())?;
let (layer, io) = SocketIo::new_layer();
io.ns("/", on_connect);
io.ns("/custom", on_connect);
let app = axum::Router::new()
.route("/", get(|| async { "Hello, World!" }))
.layer(layer);
info!("Starting server");
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
Ok(())
}
完整示例
以下是一个完整的使用socketioxide-parser-common构建WebSocket聊天室的示例:
use axum::{routing::get, Router};
use serde::{Deserialize, Serialize};
use socketioxide::{
extract::{Data, SocketRef, State},
SocketIo,
};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::info;
use tracing_subscriber::FmtSubscriber;
// 用户消息结构体
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ChatMessage {
username: String,
content: String,
}
// 用户连接状态
#[derive(Debug, Default)]
struct UserState {
count: usize,
users: Vec<String>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 初始化日志
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
// 创建共享状态
let state = Arc::new(Mutex::new(UserState::default()));
// 创建Socket.IO层
let (layer, io) = SocketIo::new_layer();
// 事件处理
io.ns("/", move |socket: SocketRef| {
let state = state.clone();
// 连接事件
info!("新用户连接: {:?}", socket.id);
// 处理登录事件
socket.on("login", move |socket: SocketRef, Data::<String>(username)| {
let mut state = state.blocking_lock();
state.count += 1;
state.users.push(username.clone());
info!("用户登录: {}", username);
// 广播新用户加入
socket.broadcast().emit("user_joined", username).ok();
// 返回当前在线用户数
socket.emit("login_success", state.count).ok();
});
// 处理聊天消息
socket.on("chat_message", |socket: SocketRef, Data::<ChatMessage>(msg)| {
info!("收到消息: {:?}", msg);
// 广播消息给所有用户
socket.broadcast().emit("new_message", msg).ok();
});
// 处理断开连接
socket.on_disconnect(move |socket: SocketRef| {
let mut state = state.blocking_lock();
state.count -= 1;
info!("用户断开连接: {:?}", socket.id);
});
});
// 设置路由
let app = Router::new()
.route("/", get(|| async { "WebSocket 聊天服务器" }))
.layer(layer);
// 启动服务器
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
info!("服务器运行在: http://localhost:3000");
axum::serve(listener, app).await?;
Ok(())
}
安装
在项目目录中运行以下Cargo命令:
cargo add socketioxide-parser-common
或者在Cargo.toml中添加:
socketioxide-parser-common = "0.17.0"
许可证
该项目使用MIT许可证。
1 回复
Rust WebSocket协议解析库socketioxide-parser-common使用指南
概述
socketioxide-parser-common
是一个专注于Socket.IO协议解析的Rust库,提供了高效的消息解析和协议处理功能。它专门用于处理WebSocket通信中的Socket.IO协议,适合构建实时应用程序。
主要特性
- 高效的Socket.IO消息解析
- 支持Socket.IO协议版本4
- 低开销的消息处理
- 类型安全的API设计
- 良好的错误处理机制
安装
在Cargo.toml中添加依赖:
[dependencies]
socketioxide-parser-common = "0.1" # 请使用最新版本
完整示例代码
use socketioxide_parser_common::{decode, encode, Packet, PacketType, ParseError};
use bytes::Bytes;
fn main() {
// 示例1: 解析Socket.IO消息
let packet = decode(r#"42["chat message","hello world"]"#).unwrap();
match packet {
Packet::Event(_, data) => {
println!("[示例1] 收到事件数据: {:?}", data);
}
_ => println!("[示例1] 收到其他类型的数据包"),
}
// 示例2: 创建和编码Socket.IO消息
let packet = Packet::Event(
"/".to_string(),
vec!["chat message".into(), "hello from rust".into()]
);
let encoded = encode(&packet).unwrap();
println!("[示例2] 编码后的数据包: {}", encoded);
// 示例3: 处理不同类型的数据包
let packets = [
r#"0{"sid":"abc123"}"#, // 连接
r#"1"#, // 断开连接
r#"2["ping"]"#, // 事件
r#"3["pong"]"#, // 确认
r#"4"#, // 错误
];
for msg in packets {
let packet = decode(msg).unwrap();
match packet {
Packet::Connect(_, sid) => println!("[示例3] 已连接, sid: {}", sid),
Packet::Disconnect(_) => println!("[示例3] 已断开连接"),
Packet::Event(_, data) => println!("[示例3] 收到事件: {:?}", data),
Packet::Ack(_, data) => println!("[示例3] 收到确认: {:?}", data),
Packet::Error(_, reason) => println!("[示例3] 错误: {:?}", reason),
_ => println!("[示例3] 其他类型的数据包"),
}
}
// 示例4: 自定义命名空间处理
let msg = r#"/admin,42["user joined",{"id":123}]"#;
let packet = decode(msg).unwrap();
if let Packet::Event(ns, data) = packet {
println!("[示例4] 命名空间: {}, 数据: {:?}", ns, data);
}
// 示例5: 二进制数据支持
let binary_data = Bytes::from(vec![1, 2, 3, 4]);
let packet = Packet::BinaryEvent(
"/".to_string(),
vec!["image".into(), binary_data.into()]
);
let encoded = encode(&packet).unwrap();
println!("[示例5] 编码后的二进制数据包: {}", encoded);
let decoded = decode(&encoded).unwrap();
println!("[示例5] 解码后的数据包: {:?}", decoded);
// 示例6: 错误处理
match decode("invalid message") {
Ok(packet) => println!("[示例6] 有效数据包: {:?}", packet),
Err(ParseError::InvalidPacketFormat) => println!("[示例6] 无效的消息格式"),
Err(ParseError::InvalidPacketType) => println!("[示例6] 无效的数据包类型"),
Err(e) => println!("[示例6] 其他错误: {:?}", e),
}
}
性能建议
- 对于高频消息场景,重用解码器实例
- 预分配缓冲区处理大量消息
- 考虑使用零拷贝技术处理二进制数据
总结
socketioxide-parser-common
提供了简单而强大的API来处理Socket.IO协议,使Rust开发者能够轻松构建实时WebSocket应用。通过类型安全的解析和编码功能,它可以有效减少协议处理中的错误,同时保持高性能。