Rust实时通信库rust_engineio的使用,实现WebSocket和Engine.IO协议的高效客户端与服务器交互

以下是关于Rust实时通信库rust_engineio的使用内容,包含WebSocket和Engine.IO协议的高效客户端与服务器交互的完整示例:

Latest Version docs.rs

Rust-engineio-client

一个用Rust编程语言实现的engine.io客户端。该实现目前支持engine.io协议的第4版修订版。如果您在使用此客户端时遇到任何连接问题,请确保服务器至少使用engine.io协议的第4版修订版。

示例用法

use rust_engineio::{ClientBuilder, Client, packet::{Packet, PacketId}};
use url::Url;
use bytes::Bytes;

// 获取带有`on_open`回调的客户端
let client: Client = ClientBuilder::new(Url::parse("http://localhost:4201").unwrap())
     .on_open(|_| println!("Connection opened!"))
     .build()
     .expect("Connection failed");

// 连接到服务器
client.connect().expect("Connection failed");

// 创建一个数据包,这里是一个消息数据包并发送它
let packet = Packet::new(PacketId::Message, Bytes::from_static(b"Hello World"));
client.emit(packet).expect("Server unreachable");

// 从服务器断开连接
client.disconnect().expect("Disconnect failed")

完整示例代码

下面是一个更完整的示例,展示如何使用rust_engineio库实现WebSocket和Engine.IO协议的客户端与服务器交互:

use rust_engineio::{ClientBuilder, Client, packet::{Packet, PacketId}};
use url::Url;
use bytes::Bytes;
use std::time::Duration;

fn main() {
    // 创建客户端连接
    let client: Client = ClientBuilder::new(Url::parse("http://localhost:4201").unwrap())
        .on_open(|_| println!("连接已建立!"))
        .on_close(|_| println!("连接已关闭"))
        .on_error(|err| println!("发生错误: {:?}", err))
        .on_data(|data| println!("收到数据: {:?}", data))
        .reconnect_interval(Duration::from_secs(5))  // 设置重连间隔
        .build()
        .expect("客户端创建失败");

    // 连接到服务器
    client.connect().expect("连接失败");

    // 发送消息
    let message = Packet::new(PacketId::Message, Bytes::from_static(b"Hello Server"));
    client.emit(message).expect("消息发送失败");

    // 保持连接一段时间
    std::thread::sleep(Duration::from_secs(10));

    // 断开连接
    client.disconnect().expect("断开连接失败");
}

异步版本示例

要使用异步版本,需要在Cargo.toml中添加以下依赖:

[dependencies]
rust-engineio = { version = "0.6.0", features = ["async"] }
tokio = { version = "1.0", features = ["full"] }

然后可以使用以下异步代码:

use rust_engineio::asynchronous::{ClientBuilder, Client};
use url::Url;
use bytes::Bytes;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 创建异步客户端
    let client: Client = ClientBuilder::new(Url::parse("http://localhost:4201").unwrap())
        .on_open(|_| println!("连接已建立!"))
        .on_close(|_| println!("连接已关闭"))
        .on_error(|err| println!("发生错误: {:?}", err))
        .on_data(|data| println!("收到数据: {:?}", data))
        .reconnect_interval(Duration::from_secs(5))
        .build()
        .await
        .expect("客户端创建失败");

    // 异步连接
    client.connect().await.expect("连接失败");

    // 异步发送消息
    let message = Packet::new(PacketId::Message, Bytes::from_static(b"Hello Server"));
    client.emit(message).await.expect("消息发送失败");

    // 保持连接
    tokio::time::sleep(Duration::from_secs(10)).await;

    // 异步断开连接
    client.disconnect().await.expect("断开连接失败");
}

可用连接方法

该库提供了多种连接方法:

  1. build: 如果允许则建立WebSocket连接,否则回退到轮询。标准配置。
  2. build_polling: 强制使用轮询传输。
  3. build_websocket_with_upgrade: 先用轮询传输建立连接,然后升级到WebSocket传输(如果可能)。
  4. build_websocket: 仅用WebSocket传输建立连接,如果WebSocket不被允许则会崩溃。

当前功能

该实现支持engine.io协议的所有功能,包括:

  • 多种传输选项
  • 发送engine.io数据包的能力
  • 注册常见的engine.io事件回调:
    • on_open
    • on_close
    • on_data
    • on_error
    • on_packet

还可以通过TlsConnector传入自定义TLS配置,以及为初始请求提供自定义头信息。

完整示例Demo

以下是一个完整的实时聊天应用示例,结合了同步和异步版本的最佳实践:

// 同步版本完整示例
use rust_engineio::{ClientBuilder, Client, packet::{Packet, PacketId}};
use url::Url;
use bytes::Bytes;
use std::time::Duration;
use std::io;

fn main() -> io::Result<()> {
    println!("正在启动Engine.IO客户端...");
    
    // 创建客户端连接
    let client = ClientBuilder::new(Url::parse("http://localhost:4201").unwrap())
        .on_open(|_| println!("✅ 已连接到服务器"))
        .on_close(|_| println!("❌ 连接已关闭"))
        .on_error(|err| eprintln!("⚠️ 连接错误: {:?}", err))
        .on_data(|data| {
            if let Packet::Message(msg) = data {
                println!("📩 收到消息: {}", String::from_utf8_lossy(&msg));
            }
        })
        .reconnect_interval(Duration::from_secs(3))
        .build()?;

    // 连接到服务器
    client.connect()?;
    
    // 发送欢迎消息
    let welcome = Packet::new(PacketId::Message, Bytes::from("用户已加入聊天"));
    client.emit(welcome)?;
    
    // 创建消息循环
    loop {
        let mut input = String::new();
        io::stdin().read_line(&mut input)?;
        
        if input.trim().eq_ignore_ascii_case("exit") {
            break;
        }
        
        let packet = Packet::new(PacketId::Message, Bytes::from(input.trim()));
        client.emit(packet)?;
    }
    
    // 断开连接
    let goodbye = Packet::new(PacketId::Message, Bytes::from("用户已离开聊天"));
    client.emit(goodbye)?;
    client.disconnect()?;
    
    Ok(())
}
// 异步版本完整示例
use rust_engineio::asynchronous::{ClientBuilder, Client};
use url::Url;
use bytes::Bytes;
use std::time::Duration;
use tokio::io::{self, AsyncBufReadExt, BufReader};

#[tokio::main]
async fn main() -> io::Result<()> {
    println!("正在启动异步Engine.IO客户端...");
    
    // 创建异步客户端
    let client = ClientBuilder::new(Url::parse("http://localhost:4201").unwrap())
        .on_open(|_| println!("✅ 已连接到服务器"))
        .on_close(|_| println!("❌ 连接已关闭"))
        .on_error(|err| eprintln!("⚠️ 连接错误: {:?}", err))
        .on_data(|data| {
            if let Packet::Message(msg) = data {
                println!("📩 收到消息: {}", String::from_utf8_lossy(&msg));
            }
        })
        .reconnect_interval(Duration::from_secs(3))
        .build()
        .await?;

    // 异步连接
    client.connect().await?;
    
    // 发送欢迎消息
    let welcome = Packet::new(PacketId::Message, Bytes::from("用户已加入聊天"));
    client.emit(welcome).await?;
    
    // 创建异步消息循环
    let stdin = BufReader::new(io::stdin());
    let mut lines = stdin.lines();
    
    while let Ok(Some(line)) = lines.next_line().await {
        if line.trim().eq_ignore_ascii_case("exit") {
            break;
        }
        
        let packet = Packet::new(PacketId::Message, Bytes::from(line.trim()));
        client.emit(packet).await?;
    }
    
    // 断开连接
    let goodbye = Packet::new(PacketId::Message, Bytes::from("用户已离开聊天"));
    client.emit(goodbye).await?;
    client.disconnect().await?;
    
    Ok(())
}

1 回复

Rust实时通信库rust_engineio的使用指南

概述

rust_engineio是一个实现了Engine.IO协议的Rust库,支持WebSocket和轮询传输,可用于构建实时通信的客户端和服务器应用。Engine.IO是Socket.IO的基础协议,提供了可靠的实时双向通信能力。

主要特性

  • 支持Engine.IO协议v3和v4
  • 同时支持WebSocket和HTTP轮询传输
  • 异步实现(基于tokio)
  • 客户端和服务器端实现
  • 消息和二进制数据支持

安装

在Cargo.toml中添加依赖:

[dependencies]
rust_engineio = "0.4"
tokio = { version = "1.0", features = ["full"] }

完整示例代码

服务器端完整示例

use rust_engineio::{Server, ServerBuilder, Socket, Packet};
use futures::StreamExt;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 创建Engine.IO服务器
    let mut server = ServerBuilder::new()
        .ping_interval(Duration::from_secs(5))
        .ping_timeout(Duration::from_secs(10))
        .max_payload(1_000_000)
        .build();

    // 处理新连接
    let mut new_connections = server.sockets();
    
    tokio::spawn(async move {
        while let Some(socket) = new_connections.next().await {
            println!("New client connected: {:?}", socket.id);
            handle_connection(socket).await;
        }
    });

    // 启动服务器
    server.bind("127.0.0.1:3000").await.unwrap();
    println!("Server started on 127.0.0.1:3000");

    // 保持服务器运行
    loop {
        sleep(Duration::from_secs(10)).await;
    }
}

async fn handle_connection(socket: Socket) {
    let (mut sender, mut receiver) = socket.split();

    // 处理接收到的消息
    tokio::spawn(async move {
        while let Some(packet) = receiver.next().await {
            match packet {
                Ok(packet) => {
                    match packet {
                        Packet::Message(text) => {
                            println!("Received message: {}", text);
                            // 处理自定义事件
                            if text.starts_with("event:") {
                                let event_data = &text[6..];
                                let response = format!("event_response:{}", event_data);
                                sender.send(Packet::Message(response)).await.unwrap();
                            } else {
                                // 回声消息
                                sender.send(Packet::Message(text)).await.unwrap();
                            }
                        }
                        Packet::Binary(data) => {
                            println!("Received binary data: {:?} bytes", data.len());
                            // 回声二进制数据
                            sender.send(Packet::Binary(data)).await.unwrap();
                        }
                        _ => {}
                    }
                }
                Err(e) => {
                    eprintln!("Error receiving packet: {}", e);
                    break;
                }
            }
        }
        println!("Client disconnected");
    });
}

客户端完整示例

use rust_engineio::{Client, ClientBuilder, Packet};
use futures::StreamExt;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 创建Engine.IO客户端
    let mut client = ClientBuilder::new("http://127.0.0.1:3000")
        .ping_interval(Duration::from_secs(5))
        .ping_timeout(Duration::from_secs(10))
        .reconnection(true)
        .reconnection_attempts(5)
        .reconnection_delay(1000)
        .connect()
        .await
        .expect("Failed to connect to server");

    println!("Connected to server");

    // 发送文本消息
    client.emit("Hello from client!".into()).await.unwrap();

    // 发送自定义事件
    client.emit("event:custom_event_data".into()).await.unwrap();

    // 发送二进制数据
    let binary_data = vec![0x01, 0x02, 0x03, 0x04];
    client.emit_binary(binary_data.into()).await.unwrap();

    // 接收消息
    tokio::spawn(async move {
        while let Some(packet) = client.next().await {
            match packet {
                Ok(packet) => {
                    match packet {
                        Packet::Message(text) => println!("Received message: {}", text),
                        Packet::Binary(data) => println!("Received binary: {:?}", data),
                        _ => {}
                    }
                }
                Err(e) => {
                    eprintln!("Error receiving packet: {}", e);
                    break;
                }
            }
        }
    });

    // 保持客户端运行
    loop {
        sleep(Duration::from_secs(5)).await;
        // 定期发送心跳消息
        client.emit("heartbeat".into()).await.unwrap();
    }
}

配置选项

ServerBuilder和ClientBuilder提供的主要配置选项:

// 服务器配置示例
let server = ServerBuilder::new()
    .ping_interval(Duration::from_secs(5))  // 心跳间隔
    .ping_timeout(Duration::from_secs(10))  // 心跳超时
    .max_payload(1_000_000)                // 最大负载大小(字节)
    .allow_upgrades(true)                  // 是否允许传输升级
    .build();

// 客户端配置示例
let client = ClientBuilder::new("http://127.0.0.1:3000")
    .reconnection(true)                    // 是否自动重连
    .reconnection_attempts(5)              // 最大重连尝试次数
    .reconnection_delay(1000)              // 重连延迟(毫秒)
    .build();

错误处理建议

// 连接错误处理
match client.connect().await {
    Ok(client) => {
        // 连接成功处理
    }
    Err(e) => {
        eprintln!("Connection failed: {}", e);
        // 实现重连逻辑
    }
}

// 消息发送错误处理
if let Err(e) = client.emit("message".into()).await {
    eprintln!("Failed to send message: {}", e);
}

性能优化建议

  1. 对于高吞吐量场景,优先使用WebSocket传输模式
  2. 合理设置心跳间隔(通常5-25秒)和超时时间(心跳间隔的2-3倍)
  3. 对于大量小消息,考虑批量发送以减少协议开销
  4. 二进制数据比文本数据更高效,特别是对于非文本内容
  5. 在高并发场景下,考虑使用连接池管理客户端连接
回到顶部