Rust WebSocket服务器库socketioxide-core的使用:高性能实时通信与Socket.IO协议实现
Rust WebSocket服务器库socketioxide-core的使用:高性能实时通信与Socket.IO协议实现
Socketioxide是一个在Rust中实现的Socket.IO服务器库,与Tower生态系统和Tokio堆栈集成。它可与任何基于Tower的服务器框架(如Axum、Warp、Salvo、Viz或Hyper)集成。
特性
- 与多种框架集成:
- Axum
- Warp
- Hyper
- Salvo
- Viz
- 开箱即用的Tower中间件支持:
- CORS
- 压缩
- 授权
- 可扩展的适配器:
- Redis/Valkey
- MongoDB
- 远程集群通信
- 命名空间和动态命名空间
- 房间
- 确认和带确认的发送
- 二进制包
- 轮询和WebSocket传输
- 默认和Msgpack解析器
- 扩展以向socket添加自定义数据
- 内存高效的HTTP负载流解析
- 类似Axum的灵活API处理事件
- 支持所有Socket.IO版本
示例
聊天应用示例
io.ns("/", |s: SocketRef| {
s.on("new message", |s: SocketRef, Data::<String>(msg)| {
let username = s.extensions.get::<Username>().unwrap().clone();
let msg = Res::Message {
username,
message: msg,
};
s.broadcast().emit("new message", msg).ok();
});
s.on(
"add user",
|s: SocketRef, Data::<String>(username), user_cnt: State<UserCnt>| {
if s.extensions.get::<Username>().is_some() {
return;
}
let num_users = user_cnt.add_user();
s.extensions.insert(Username(username.clone()));
s.emit("login", Res::Login { num_users }).ok();
let res = Res::UserEvent {
num_users,
username: Username(username),
};
s.broadcast().emit("user joined", res).ok();
},
);
s.on("typing", |s: SocketRef| {
let username = s.extensions.get::<Username>().unwrap().clone();
s.broadcast()
.emit("typing", Res::Username { username })
.ok();
});
s.on("stop typing", |s: SocketRef| {
let username = s.extensions.get::<Username>().unwrap().clone();
s.broadcast()
.emit("stop typing", Res::Username { username })
.ok();
});
s.on_disconnect(|s: SocketRef, user_cnt: State<UserCnt>| {
if let Some(username) = s.extensions.get::<Username>() {
let num_users = user_cnt.remove_user();
let res = Res::UserEvent {
num_users,
username: username.clone(),
};
s.broadcast().emit("user left", res).ok();
}
});
});
使用Axum实现的Echo示例
use axum::routing::get;
use serde_json::Value;
use socketioxide::{
extract::{AckSender, Bin, Data, SocketRef},
SocketIo,
};
use tracing::info;
use tracing_subscriber::FmtSubscriber;
fn on_connect(socket: SocketRef, Data(data): Data<Value>) {
info!("Socket.IO connected: {:?} {:?}", socket.ns(), socket.id);
socket.emit("auth", data).ok();
socket.on(
"message",
|socket: SocketRef, Data::<Value>(data), Bin(bin)| {
info!("Received event: {:?} {:?}", data, bin);
socket.bin(bin).emit("message-back", data).ok();
},
);
socket.on(
"message-with-ack",
|Data::<Value>(data), ack: AckSender, Bin(bin)| {
info!("Received event: {:?} {:?}", data, bin);
ack.bin(bin).send(data).ok();
},
);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::subscriber::set_global_default(FmtSubscriber::default())?;
let (layer, io) = SocketIo::new_layer();
io.ns("/", on_connect);
io.ns("/custom", on_connect);
let app = axum::Router::new()
.route("/", get(|| async { "Hello, World!" }))
.layer(layer);
info!("Starting server");
let listener = tokio::net::TcpListener::bind("0.0.0.0:極:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
Ok(())
}
完整示例代码
下面是一个完整的WebSocket服务器示例,展示了如何使用socketioxide-core实现基本的Echo功能:
use axum::{
routing::get,
Router,
};
use socketioxide::{
extract::{AckSender, Bin, Data, SocketRef},
SocketIo,
};
use tracing::info;
use tracing_subscriber::FmtSubscriber;
// 处理socket连接事件
fn on_connect(socket: SocketRef, Data(data): Data<serde_json::Value>) {
info!("New client connected: {:?}", socket.id);
// 发送欢迎消息
socket.emit("welcome", data).ok();
// 处理消息事件
socket.on(
"message",
|socket: SocketRef, Data::<String>(msg), Bin(bin)| {
info!("Received message: {}", msg);
// 将消息回传给客户端
socket.bin(bin).emit("echo", msg).ok();
},
);
// 处理带确认的消息
socket.on(
"message-with-ack",
|Data::<String>(msg), ack: AckSender| {
info!("Received message with ack: {}", msg);
// 发送确认响应
ack.send(format!("Received: {}", msg)).ok();
},
);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 初始化日志
tracing_subscriber::fmt().init();
// 创建Socket.IO层
let (socket_layer, io) = SocketIo::new_layer();
// 设置命名空间和连接处理器
io.ns("/", on_connect);
io.ns("/custom", on_connect);
// 创建Axum路由
let app = Router::new()
.route("/", get(|| async { "WebSocket Server Running" }))
.layer(socket_layer);
// 启动服务器
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
info!("Server listening on port 3000");
axum::serve(listener, app).await?;
Ok(())
}
贡献和反馈
欢迎任何贡献,可以随时提交问题或PR。如果有任何问题或反馈,请在讨论区创建话题。
许可证
此项目采用MIT许可证。
1 回复
Rust WebSocket服务器库socketioxide-core的使用:高性能实时通信与Socket.IO协议实现
socketioxide-core
是一个用于构建高性能实时通信应用的Rust库,它实现了Socket.IO协议,使开发者能够轻松创建WebSocket服务器。
主要特性
- 完整的Socket.IO协议v4实现
- 高性能异步处理
- 支持命名空间(namespace)和房间(room)的概念
- 内置心跳机制
- 支持广播和定向消息
- 可扩展的中间件系统
安装方法
在Cargo.toml中添加依赖:
[dependencies]
socketioxide-core = "0.7"
tokio = { version = "1.0", features = ["full"] }
基本使用方法
1. 创建基本Socket.IO服务器
use socketioxide::SocketIo;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(10);
let svc = SocketIo::new()
.with_state(tx.clone())
.on_connect(|socket| async move {
println!("Socket connected: {:?}", socket.id);
})
.on_disconnect(|socket| async move {
println!("Socket disconnected: {:?}", socket.id);
})
.on("message", |socket, data: String| async move {
println!("Received message: {}", data);
socket.emit("response", "Message received").ok();
});
// 启动服务器
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
svc.listen(listener).await;
}
2. 处理命名空间
let svc = SocketIo::new()
.ns("/admin", |socket| {
socket.on_connect(|socket| async move {
println!("Admin connected: {:?}", socket.id);
})
})
.ns("/user", |socket| {
socket.on_connect(|socket| async move {
println!("User connected: {:?}", socket.id);
})
});
3. 使用房间功能
.on("join", |socket, room: String| async move {
socket.join(room.clone()).unwrap();
socket.emit("joined", format!("Joined room {}", room)).ok();
})
.on("room_message", |socket, (room, msg): (String, String)| async move {
socket.to(room).emit("room_message", msg).ok();
})
4. 广播消息
.on("broadcast", |socket, msg: String| async move {
socket.broadcast().emit("broadcast_message", msg).ok();
})
客户端连接示例
使用JavaScript客户端连接:
const io = require("socket.io-client");
const socket = io("http://localhost:3000");
socket.on("connect", () => {
console.log("Connected to server");
socket.emit("message", "Hello from client");
socket.on("response", (msg) => {
console.log("Server response:", msg);
});
});
高级功能
自定义适配器
use socketioxide::adapter::LocalAdapter;
let svc = SocketIo::builder()
.with_adapter(LocalAdapter::default())
.build();
使用中间件
.use_fn(|socket, next| async move {
println!("Middleware executed for socket: {:?}", socket.id);
next().await;
})
处理二进制数据
.on("binary", |socket, data: Vec<u8>| async move {
println!("Received binary data of length: {}", data.len());
socket.emit("binary_response", data).ok();
})
性能优化建议
- 对于高并发场景,考虑使用Redis适配器进行水平扩展
- 合理设置ping间隔和超时时间
- 使用房间功能而不是广播所有消息
- 考虑压缩大型消息
完整示例demo
下面是一个完整的聊天服务器示例,结合了命名空间、房间和广播功能:
use socketioxide::SocketIo;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// 创建Socket.IO服务
let svc = SocketIo::new()
// 公共命名空间
.on_connect(|socket| async move {
println!("新连接: {:?}", socket.id);
socket.emit("welcome", "Welcome to public chat").ok();
})
// 聊天命名空间
.ns("/chat", |socket| {
socket
.on_connect(|socket| async move {
println!("用户进入聊天室: {:?}", socket.id);
})
.on("join", |socket, room: String| async move {
// 加入指定房间
socket.join(room.clone()).unwrap();
socket.emit("joined", format!("你已加入房间: {}", room)).ok();
socket.to(room).emit("notification", format!("新用户加入了房间: {}", socket.id)).ok();
})
.on("message", |socket, (room, msg): (String, String)| async move {
// 向房间内所有用户发送消息
println!("收到来自房间 {} 的消息: {}", room, msg);
socket.to(room).emit("message", (socket.id.to_string(), msg)).ok();
})
.on("leave", |socket, room: String| async move {
// 离开房间
socket.leave(room.clone()).unwrap();
socket.emit("left", format!("你已离开房间: {}", room)).ok();
socket.to(room).emit("notification", format!("用户 {} 离开了房间", socket.id)).ok();
})
})
// 管理员命名空间
.ns("/admin", |socket| {
socket
.on_connect(|socket| async move {
println!("管理员连接: {:?}", socket.id);
})
.on("broadcast", |socket, msg: String| async move {
// 向所有连接广播消息
println!("管理员广播: {}", msg);
socket.broadcast().emit("admin_broadcast", msg).ok();
})
});
// 启动服务器
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
println!("服务器运行在: http://localhost:3000");
svc.listen(listener).await;
}
对应的客户端JavaScript代码:
const io = require("socket.io-client");
// 连接到公共命名空间
const publicSocket = io("http://localhost:3000");
publicSocket.on("welcome", (msg) => {
console.log("公共空间消息:", msg);
});
// 连接到聊天命名空间
const chatSocket = io("http://localhost:3000/chat");
chatSocket.on("connect", () => {
console.log("已连接到聊天服务器");
// 加入房间
chatSocket.emit("join", "general");
// 发送消息
chatSocket.emit("message", ["general", "Hello everyone!"]);
// 接收消息
chatSocket.on("message", ([senderId, msg]) => {
console.log(`${senderId}: ${msg}`);
});
// 接收通知
chatSocket.on("notification", (msg) => {
console.log("通知:", msg);
});
});
// 管理员连接示例
const adminSocket = io("http://localhost:3000/admin");
adminSocket.on("connect", () => {
console.log("管理员已连接");
adminSocket.emit("broadcast", "重要系统通知");
});
socketioxide-core
提供了强大的功能来构建实时应用程序,同时保持了Rust的高性能和安全性特点。