Rust WebSocket服务器库socketioxide-core的使用:高性能实时通信与Socket.IO协议实现

Rust WebSocket服务器库socketioxide-core的使用:高性能实时通信与Socket.IO协议实现

Socketioxide是一个在Rust中实现的Socket.IO服务器库,与Tower生态系统和Tokio堆栈集成。它可与任何基于Tower的服务器框架(如Axum、Warp、Salvo、Viz或Hyper)集成。

Socketioxide logo

特性

  • 与多种框架集成:
    • Axum
    • Warp
    • Hyper
    • Salvo
    • Viz
  • 开箱即用的Tower中间件支持:
    • CORS
    • 压缩
    • 授权
  • 可扩展的适配器:
    • Redis/Valkey
    • MongoDB
  • 远程集群通信
  • 命名空间和动态命名空间
  • 房间
  • 确认和带确认的发送
  • 二进制包
  • 轮询和WebSocket传输
  • 默认和Msgpack解析器
  • 扩展以向socket添加自定义数据
  • 内存高效的HTTP负载流解析
  • 类似Axum的灵活API处理事件
  • 支持所有Socket.IO版本

示例

聊天应用示例

io.ns("/", |s: SocketRef| {
    s.on("new message", |s: SocketRef, Data::<String>(msg)| {
        let username = s.extensions.get::<Username>().unwrap().clone();
        let msg = Res::Message {
            username,
            message: msg,
        };
        s.broadcast().emit("new message", msg).ok();
    });

    s.on(
        "add user",
        |s: SocketRef, Data::<String>(username), user_cnt: State<UserCnt>| {
            if s.extensions.get::<Username>().is_some() {
                return;
            }
            let num_users = user_cnt.add_user();
            s.extensions.insert(Username(username.clone()));
            s.emit("login", Res::Login { num_users }).ok();

            let res = Res::UserEvent {
                num_users,
                username: Username(username),
            };
            s.broadcast().emit("user joined", res).ok();
        },
    );

    s.on("typing", |s: SocketRef| {
        let username = s.extensions.get::<Username>().unwrap().clone();
        s.broadcast()
            .emit("typing", Res::Username { username })
            .ok();
    });

    s.on("stop typing", |s: SocketRef| {
        let username = s.extensions.get::<Username>().unwrap().clone();
        s.broadcast()
            .emit("stop typing", Res::Username { username })
            .ok();
    });

    s.on_disconnect(|s: SocketRef, user_cnt: State<UserCnt>| {
        if let Some(username) = s.extensions.get::<Username>() {
            let num_users = user_cnt.remove_user();
            let res = Res::UserEvent {
                num_users,
                username: username.clone(),
            };
            s.broadcast().emit("user left", res).ok();
        }
    });
});

使用Axum实现的Echo示例

use axum::routing::get;
use serde_json::Value;
use socketioxide::{
    extract::{AckSender, Bin, Data, SocketRef},
    SocketIo,
};
use tracing::info;
use tracing_subscriber::FmtSubscriber;

fn on_connect(socket: SocketRef, Data(data): Data<Value>) {
    info!("Socket.IO connected: {:?} {:?}", socket.ns(), socket.id);
    socket.emit("auth", data).ok();

    socket.on(
        "message",
        |socket: SocketRef, Data::<Value>(data), Bin(bin)| {
            info!("Received event: {:?} {:?}", data, bin);
            socket.bin(bin).emit("message-back", data).ok();
        },
    );

    socket.on(
        "message-with-ack",
        |Data::<Value>(data), ack: AckSender, Bin(bin)| {
            info!("Received event: {:?} {:?}", data, bin);
            ack.bin(bin).send(data).ok();
        },
    );
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing::subscriber::set_global_default(FmtSubscriber::default())?;

    let (layer, io) = SocketIo::new_layer();

    io.ns("/", on_connect);
    io.ns("/custom", on_connect);

    let app = axum::Router::new()
        .route("/", get(|| async { "Hello, World!" }))
        .layer(layer);

    info!("Starting server");

    let listener = tokio::net::TcpListener::bind("0.0.0.0:極:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();

    Ok(())
}

完整示例代码

下面是一个完整的WebSocket服务器示例,展示了如何使用socketioxide-core实现基本的Echo功能:

use axum::{
    routing::get,
    Router,
};
use socketioxide::{
    extract::{AckSender, Bin, Data, SocketRef},
    SocketIo,
};
use tracing::info;
use tracing_subscriber::FmtSubscriber;

// 处理socket连接事件
fn on_connect(socket: SocketRef, Data(data): Data<serde_json::Value>) {
    info!("New client connected: {:?}", socket.id);
    
    // 发送欢迎消息
    socket.emit("welcome", data).ok();
    
    // 处理消息事件
    socket.on(
        "message",
        |socket: SocketRef, Data::<String>(msg), Bin(bin)| {
            info!("Received message: {}", msg);
            // 将消息回传给客户端
            socket.bin(bin).emit("echo", msg).ok();
        },
    );
    
    // 处理带确认的消息
    socket.on(
        "message-with-ack", 
        |Data::<String>(msg), ack: AckSender| {
            info!("Received message with ack: {}", msg);
            // 发送确认响应
            ack.send(format!("Received: {}", msg)).ok();
        },
    );
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 初始化日志
    tracing_subscriber::fmt().init();
    
    // 创建Socket.IO层
    let (socket_layer, io) = SocketIo::new_layer();
    
    // 设置命名空间和连接处理器
    io.ns("/", on_connect);
    io.ns("/custom", on_connect);
    
    // 创建Axum路由
    let app = Router::new()
        .route("/", get(|| async { "WebSocket Server Running" }))
        .layer(socket_layer);
    
    // 启动服务器
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    info!("Server listening on port 3000");
    
    axum::serve(listener, app).await?;
    
    Ok(())
}

贡献和反馈

欢迎任何贡献,可以随时提交问题或PR。如果有任何问题或反馈,请在讨论区创建话题。

许可证

此项目采用MIT许可证。


1 回复

Rust WebSocket服务器库socketioxide-core的使用:高性能实时通信与Socket.IO协议实现

socketioxide-core 是一个用于构建高性能实时通信应用的Rust库,它实现了Socket.IO协议,使开发者能够轻松创建WebSocket服务器。

主要特性

  • 完整的Socket.IO协议v4实现
  • 高性能异步处理
  • 支持命名空间(namespace)和房间(room)的概念
  • 内置心跳机制
  • 支持广播和定向消息
  • 可扩展的中间件系统

安装方法

在Cargo.toml中添加依赖:

[dependencies]
socketioxide-core = "0.7"
tokio = { version = "1.0", features = ["full"] }

基本使用方法

1. 创建基本Socket.IO服务器

use socketioxide::SocketIo;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(10);
    
    let svc = SocketIo::new()
        .with_state(tx.clone())
        .on_connect(|socket| async move {
            println!("Socket connected: {:?}", socket.id);
        })
        .on_disconnect(|socket| async move {
            println!("Socket disconnected: {:?}", socket.id);
        })
        .on("message", |socket, data: String| async move {
            println!("Received message: {}", data);
            socket.emit("response", "Message received").ok();
        });

    // 启动服务器
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    svc.listen(listener).await;
}

2. 处理命名空间

let svc = SocketIo::new()
    .ns("/admin", |socket| {
        socket.on_connect(|socket| async move {
            println!("Admin connected: {:?}", socket.id);
        })
    })
    .ns("/user", |socket| {
        socket.on_connect(|socket| async move {
            println!("User connected: {:?}", socket.id);
        })
    });

3. 使用房间功能

.on("join", |socket, room: String| async move {
    socket.join(room.clone()).unwrap();
    socket.emit("joined", format!("Joined room {}", room)).ok();
})
.on("room_message", |socket, (room, msg): (String, String)| async move {
    socket.to(room).emit("room_message", msg).ok();
})

4. 广播消息

.on("broadcast", |socket, msg: String| async move {
    socket.broadcast().emit("broadcast_message", msg).ok();
})

客户端连接示例

使用JavaScript客户端连接:

const io = require("socket.io-client");

const socket = io("http://localhost:3000");

socket.on("connect", () => {
    console.log("Connected to server");
    
    socket.emit("message", "Hello from client");
    
    socket.on("response", (msg) => {
        console.log("Server response:", msg);
    });
});

高级功能

自定义适配器

use socketioxide::adapter::LocalAdapter;

let svc = SocketIo::builder()
    .with_adapter(LocalAdapter::default())
    .build();

使用中间件

.use_fn(|socket, next| async move {
    println!("Middleware executed for socket: {:?}", socket.id);
    next().await;
})

处理二进制数据

.on("binary", |socket, data: Vec<u8>| async move {
    println!("Received binary data of length: {}", data.len());
    socket.emit("binary_response", data).ok();
})

性能优化建议

  1. 对于高并发场景,考虑使用Redis适配器进行水平扩展
  2. 合理设置ping间隔和超时时间
  3. 使用房间功能而不是广播所有消息
  4. 考虑压缩大型消息

完整示例demo

下面是一个完整的聊天服务器示例,结合了命名空间、房间和广播功能:

use socketioxide::SocketIo;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 创建Socket.IO服务
    let svc = SocketIo::new()
        // 公共命名空间
        .on_connect(|socket| async move {
            println!("新连接: {:?}", socket.id);
            socket.emit("welcome", "Welcome to public chat").ok();
        })
        // 聊天命名空间
        .ns("/chat", |socket| {
            socket
                .on_connect(|socket| async move {
                    println!("用户进入聊天室: {:?}", socket.id);
                })
                .on("join", |socket, room: String| async move {
                    // 加入指定房间
                    socket.join(room.clone()).unwrap();
                    socket.emit("joined", format!("你已加入房间: {}", room)).ok();
                    socket.to(room).emit("notification", format!("新用户加入了房间: {}", socket.id)).ok();
                })
                .on("message", |socket, (room, msg): (String, String)| async move {
                    // 向房间内所有用户发送消息
                    println!("收到来自房间 {} 的消息: {}", room, msg);
                    socket.to(room).emit("message", (socket.id.to_string(), msg)).ok();
                })
                .on("leave", |socket, room: String| async move {
                    // 离开房间
                    socket.leave(room.clone()).unwrap();
                    socket.emit("left", format!("你已离开房间: {}", room)).ok();
                    socket.to(room).emit("notification", format!("用户 {} 离开了房间", socket.id)).ok();
                })
        })
        // 管理员命名空间
        .ns("/admin", |socket| {
            socket
                .on_connect(|socket| async move {
                    println!("管理员连接: {:?}", socket.id);
                })
                .on("broadcast", |socket, msg: String| async move {
                    // 向所有连接广播消息
                    println!("管理员广播: {}", msg);
                    socket.broadcast().emit("admin_broadcast", msg).ok();
                })
        });

    // 启动服务器
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    println!("服务器运行在: http://localhost:3000");
    svc.listen(listener).await;
}

对应的客户端JavaScript代码:

const io = require("socket.io-client");

// 连接到公共命名空间
const publicSocket = io("http://localhost:3000");
publicSocket.on("welcome", (msg) => {
    console.log("公共空间消息:", msg);
});

// 连接到聊天命名空间
const chatSocket = io("http://localhost:3000/chat");
chatSocket.on("connect", () => {
    console.log("已连接到聊天服务器");
    
    // 加入房间
    chatSocket.emit("join", "general");
    
    // 发送消息
    chatSocket.emit("message", ["general", "Hello everyone!"]);
    
    // 接收消息
    chatSocket.on("message", ([senderId, msg]) => {
        console.log(`${senderId}: ${msg}`);
    });
    
    // 接收通知
    chatSocket.on("notification", (msg) => {
        console.log("通知:", msg);
    });
});

// 管理员连接示例
const adminSocket = io("http://localhost:3000/admin");
adminSocket.on("connect", () => {
    console.log("管理员已连接");
    adminSocket.emit("broadcast", "重要系统通知");
});

socketioxide-core 提供了强大的功能来构建实时应用程序,同时保持了Rust的高性能和安全性特点。

回到顶部