Rust WebSocket协议解析库socketioxide-parser-common的使用,支持高效Socket.IO消息解析与协议处理

Rust WebSocket协议解析库socketioxide-parser-common的使用

Socketioxide是一个Rust实现的Socket.IO服务器,它能与Tower生态系统和Tokio栈集成。它可以与任何基于tower的服务器框架集成,如Axum、Warp、Salvo、Viz或Hyper。

主要特性

  • 与多种框架集成:
    • Axum、Hyper、Salvo、Viz等
  • 支持各种基于tower的中间件:
    • CORS、压缩、授权等
  • 可扩展的适配器:
    • Redis/Valkey、MongoDB等
  • 远程集群通信
  • 命名空间和动态命名空间
  • 房间功能
  • 确认和带确认的发送
  • 二进制数据包
  • 轮询和WebSocket传输
  • 常用和Msgpack解析器
  • 扩展功能为socket添加自定义数据
  • 高效的内存HTTP负载解析
  • 类似Axum的API处理事件
  • 支持所有Socket.IO版本

示例代码

聊天应用示例

io.ns("/", |s: SocketRef| {
    s.on("new message", |s: SocketRef, Data::<String>(msg)| {
        let username = s.extensions.get::<Username>().unwrap().clone();
        let msg = Res::Message {
            username,
            message: msg,
        };
        s.broadcast().emit("new message", msg).ok();
    });

    s.on(
        "add user",
        |s: SocketRef, Data::<String>(username), user_cnt: State<UserCnt>| {
            if s.extensions.get::<Username>().is_some() {
                return;
            }
            let num_users = user_cnt.add_user();
            s.extensions.insert(Username(username.clone()));
            s.emit("login", Res::Login { num_users }).ok();

            let res = Res::UserEvent {
                num_users,
                username: Username(username),
            };
            s.broadcast().emit("user joined", res).ok();
        },
    );

    s.on("typing", |s: SocketRef| {
        let username = s.extensions.get::<Username>().unwrap().clone();
        s.broadcast()
            .emit("typing", Res::Username { username })
            .ok();
    });

    s.on("stop typing", |s: SocketRef| {
        let username = s.extensions.get::<Username>().unwrap().clone();
        s.broadcast()
            .emit("stop typing", Res::Username { username })
            .ok();
    });

    s.on_disconnect(|s: SocketRef, user_cnt: State<UserCnt>| {
        if let Some(username) = s.extensions.get::<Username>() {
            let num_users = user_cnt.remove_user();
            let res = Res::UserEvent {
                num_users,
                username: username.clone(),
            };
            s.b广播.emit("user left", res).ok();
        }
    });
});

Axum实现的Echo示例

use axum::routing::get;
use serde_json::Value;
use socketioxide::{
    extract::{AckSender, Bin, Data, SocketRef},
    SocketIo,
};
use tracing::info;
use tracing_subscriber::FmtSubscriber;

fn on_connect(socket: SocketRef, Data(data): Data<Value>) {
    info!("Socket.IO connected: {:?} {:?}", socket.ns(), socket.id);
    socket.emit("auth", data).ok();

    socket.on(
        "message",
        |socket: SocketRef, Data::<Value>(data), Bin(bin)| {
            info!("Received event: {:?} {:?}", data, bin);
            socket.bin(bin).emit("message-back", data).ok();
        },
    );

    socket.on(
        "message-with-ack",
        |Data::<Value>(data), ack: AckSender, Bin(bin)| {
            info!("Received event: {:?} {:?}", data, bin);
            ack.bin(bin).send(data).ok();
        },
    );
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing::subscriber::set_global_default(FmtSubscriber::default())?;

    let (layer, io) = SocketIo::new_layer();

    io.ns("/", on_connect);
    io.ns("/custom", on_connect);

    let app = axum::Router::new()
        .route("/", get(|| async { "Hello, World!" }))
        .layer(layer);

    info!("Starting server");

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();

    Ok(())
}

完整示例

以下是一个完整的使用socketioxide-parser-common构建WebSocket聊天室的示例:

use axum::{routing::get, Router};
use serde::{Deserialize, Serialize};
use socketioxide::{
    extract::{Data, SocketRef, State},
    SocketIo,
};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::info;
use tracing_subscriber::FmtSubscriber;

// 用户消息结构体
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ChatMessage {
    username: String,
    content: String,
}

// 用户连接状态
#[derive(Debug, Default)]
struct UserState {
    count: usize,
    users: Vec<String>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 初始化日志
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::DEBUG)
        .init();

    // 创建共享状态
    let state = Arc::new(Mutex::new(UserState::default()));

    // 创建Socket.IO层
    let (layer, io) = SocketIo::new_layer();

    // 事件处理
    io.ns("/", move |socket: SocketRef| {
        let state = state.clone();
        
        // 连接事件
        info!("新用户连接: {:?}", socket.id);

        // 处理登录事件
        socket.on("login", move |socket: SocketRef, Data::<String>(username)| {
            let mut state = state.blocking_lock();
            state.count += 1;
            state.users.push(username.clone());
            
            info!("用户登录: {}", username);
            
            // 广播新用户加入
            socket.broadcast().emit("user_joined", username).ok();
            
            // 返回当前在线用户数
            socket.emit("login_success", state.count).ok();
        });

        // 处理聊天消息
        socket.on("chat_message", |socket: SocketRef, Data::<ChatMessage>(msg)| {
            info!("收到消息: {:?}", msg);
            // 广播消息给所有用户
            socket.broadcast().emit("new_message", msg).ok();
        });

        // 处理断开连接
        socket.on_disconnect(move |socket: SocketRef| {
            let mut state = state.blocking_lock();
            state.count -= 1;
            info!("用户断开连接: {:?}", socket.id);
        });
    });

    // 设置路由
    let app = Router::new()
        .route("/", get(|| async { "WebSocket 聊天服务器" }))
        .layer(layer);

    // 启动服务器
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    info!("服务器运行在: http://localhost:3000");
    axum::serve(listener, app).await?;

    Ok(())
}

安装

在项目目录中运行以下Cargo命令:

cargo add socketioxide-parser-common

或者在Cargo.toml中添加:

socketioxide-parser-common = "0.17.0"

许可证

该项目使用MIT许可证。


1 回复

Rust WebSocket协议解析库socketioxide-parser-common使用指南

概述

socketioxide-parser-common是一个专注于Socket.IO协议解析的Rust库,提供了高效的消息解析和协议处理功能。它专门用于处理WebSocket通信中的Socket.IO协议,适合构建实时应用程序。

主要特性

  • 高效的Socket.IO消息解析
  • 支持Socket.IO协议版本4
  • 低开销的消息处理
  • 类型安全的API设计
  • 良好的错误处理机制

安装

在Cargo.toml中添加依赖:

[dependencies]
socketioxide-parser-common = "0.1"  # 请使用最新版本

完整示例代码

use socketioxide_parser_common::{decode, encode, Packet, PacketType, ParseError};
use bytes::Bytes;

fn main() {
    // 示例1: 解析Socket.IO消息
    let packet = decode(r#"42["chat message","hello world"]"#).unwrap();
    match packet {
        Packet::Event(_, data) => {
            println!("[示例1] 收到事件数据: {:?}", data);
        }
        _ => println!("[示例1] 收到其他类型的数据包"),
    }

    // 示例2: 创建和编码Socket.IO消息
    let packet = Packet::Event(
        "/".to_string(), 
        vec!["chat message".into(), "hello from rust".into()]
    );
    let encoded = encode(&packet).unwrap();
    println!("[示例2] 编码后的数据包: {}", encoded);

    // 示例3: 处理不同类型的数据包
    let packets = [
        r#"0{"sid":"abc123"}"#,  // 连接
        r#"1"#,                 // 断开连接
        r#"2["ping"]"#,         // 事件
        r#"3["pong"]"#,         // 确认
        r#"4"#,                 // 错误
    ];

    for msg in packets {
        let packet = decode(msg).unwrap();
        match packet {
            Packet::Connect(_, sid) => println!("[示例3] 已连接, sid: {}", sid),
            Packet::Disconnect(_) => println!("[示例3] 已断开连接"),
            Packet::Event(_, data) => println!("[示例3] 收到事件: {:?}", data),
            Packet::Ack(_, data) => println!("[示例3] 收到确认: {:?}", data),
            Packet::Error(_, reason) => println!("[示例3] 错误: {:?}", reason),
            _ => println!("[示例3] 其他类型的数据包"),
        }
    }

    // 示例4: 自定义命名空间处理
    let msg = r#"/admin,42["user joined",{"id":123}]"#;
    let packet = decode(msg).unwrap();
    if let Packet::Event(ns, data) = packet {
        println!("[示例4] 命名空间: {}, 数据: {:?}", ns, data);
    }

    // 示例5: 二进制数据支持
    let binary_data = Bytes::from(vec![1, 2, 3, 4]);
    let packet = Packet::BinaryEvent(
        "/".to_string(),
        vec!["image".into(), binary_data.into()]
    );
    let encoded = encode(&packet).unwrap();
    println!("[示例5] 编码后的二进制数据包: {}", encoded);
    let decoded = decode(&encoded).unwrap();
    println!("[示例5] 解码后的数据包: {:?}", decoded);

    // 示例6: 错误处理
    match decode("invalid message") {
        Ok(packet) => println!("[示例6] 有效数据包: {:?}", packet),
        Err(ParseError::InvalidPacketFormat) => println!("[示例6] 无效的消息格式"),
        Err(ParseError::InvalidPacketType) => println!("[示例6] 无效的数据包类型"),
        Err(e) => println!("[示例6] 其他错误: {:?}", e),
    }
}

性能建议

  1. 对于高频消息场景,重用解码器实例
  2. 预分配缓冲区处理大量消息
  3. 考虑使用零拷贝技术处理二进制数据

总结

socketioxide-parser-common提供了简单而强大的API来处理Socket.IO协议,使Rust开发者能够轻松构建实时WebSocket应用。通过类型安全的解析和编码功能,它可以有效减少协议处理中的错误,同时保持高性能。

回到顶部