Rust HTTP与WebSocket双协议客户端库reqwest-websocket的使用,支持高效异步网络通信

Rust HTTP与WebSocket双协议客户端库reqwest-websocket的使用,支持高效异步网络通信

reqwest-websocketreqwest 的扩展库,允许建立 WebSocket 连接。该库提供了 RequestBuilderExt 扩展特性,为 reqwest::RequestBuilder 添加了 upgrade 方法,用于将 HTTP 连接升级为 WebSocket。

示例代码

以下是基础用法示例:

// 导入WebSocket升级扩展
use reqwest_websocket::RequestBuilderExt;

// 创建GET请求并升级为WebSocket
let response = Client::default()
    .get("wss://echo.websocket.org/")
    .upgrade() // 准备WebSocket升级
    .send()
    .await?;

// 将响应转换为WebSocket流
let mut websocket = response.into_websocket().await?;

// 发送文本消息
websocket.send(Message::Text("Hello, World".into())).await?;

// 接收消息
while let Some(message) = websocket.try_next().await? {
    if let Message::Text(text) = message {
        println!("received: {text}")
    }
}

完整示例DEMO

下面是一个完整的WebSocket客户端实现:

use reqwest::{Client, Error};
use reqwest_websocket::RequestBuilderExt;
use tokio_tungstenite::tungstenite::Message;

#[tokio::main]
async fn main() -> Result<(), Error> {
    // 1. 初始化HTTP客户端
    let client = Client::new();

    // 2. 建立WebSocket连接
    let response = client
        .get("wss://echo.websocket.org/") // 使用WebSocket测试服务器
        .upgrade() // 请求升级协议
        .send() // 发送HTTP请求
        .await?;

    // 3. 转换为WebSocket流
    let mut websocket = response.into_websocket().await?;

    // 4. 发送测试消息
    websocket
        .send(Message::Text("Hello from Rust!".into()))
        .await?;

    // 5. 处理接收到的消息
    while let Some(message) = websocket.try_next().await? {
        match message {
            Message::Text(text) => println!("收到文本消息: {}", text),
            Message::Binary(bin) => println!("收到二进制消息: {:?}", bin),
            Message::Ping(_) => println!("收到Ping帧"),
            Message::Pong(_) => println!("收到Pong帧"),
            Message::Close(_) => {
                println!("连接关闭");
                break;
            }
        }
    }

    Ok(())
}

功能特点

  1. 协议升级:通过 upgrade() 方法实现HTTP到WebSocket的无缝升级
  2. 全双工通信:支持同时发送和接收消息
  3. 异步IO:基于Tokio运行时实现高效网络通信
  4. 消息类型:完整支持WebSocket协议中的各种消息帧类型

WebAssembly兼容性

在WebAssembly环境下,该库会自动回退到使用浏览器原生WebSocket API(web_sys::WebSocket),因为标准HTTP升级机制在浏览器环境中不可用。

依赖安装

在项目的Cargo.toml中添加以下依赖:

[dependencies]
reqwest-websocket = "0.5.0"
tokio = { version = "1.0", features = ["full"] }
tokio-tungstenite = "0.20.0"

授权许可

该项目采用MIT开源许可证发布。


1 回复

reqwest-websocket: Rust中的HTTP与WebSocket双协议客户端库

reqwest-websocket是一个基于reqwesttokio-tungstenite的Rust库,它允许在同一客户端中同时处理HTTP和WebSocket协议,支持高效的异步网络通信。

主要特性

  • 基于reqwest的HTTP客户端功能
  • 支持WebSocket协议通信
  • 异步/await语法支持
  • 与Tokio运行时无缝集成
  • 可配置的连接选项

使用方法

添加依赖

首先在Cargo.toml中添加依赖:

[dependencies]
reqwest-websocket = "0.4"
tokio = { version = "1.0", features = ["full"] }

基本HTTP请求示例

use reqwest_websocket::{RequestBuilderExt, WebSocketUpgrade};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 普通HTTP GET请求
    let response = reqwest::Client::new()
        .get("https://httpbin.org/get")
        .send()
        .await?;
    
    println!("HTTP Response: {:?}", response.text().await?);
    
    Ok(())
}

WebSocket连接示例

use reqwest_websocket::{RequestBuilderExt, WebSocketUpgrade};
use futures_util::{SinkExt, StreamExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 升级为WebSocket连接
    let (ws, _) = reqwest::Client::new()
        .get("wss://echo.websocket.org")
        .upgrade()
        .send()
        .await?
        .into_websocket()
        .await?;
    
    let (mut tx, mut rx) = ws.split();
    
    // 发送消息
    tx.send("Hello, WebSocket!".into()).await?;
    
    // 接收消息
    if let Some(msg) = rx.next().await {
        let msg = msg?;
        println!("Received: {:?}", msg);
    }
    
    Ok(())
}

同时处理HTTP和WebSocket

use reqwest_websocket::{RequestBuilderExt, WebSocketUpgrade};
use futures_util::{SinkExt, StreamExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = reqwest::Client::new();
    
    // 1. 先发送HTTP请求
    let http_resp = client.get("https://httpbin.org/get")
        .send()
        .await?;
    println!("HTTP Status: {}", http_resp.status());
    
    // 2. 然后建立WebSocket连接
    let (mut ws, _) = client.get("wss://echo.websocket.org")
        .upgrade()
        .send()
        .await?
        .into_websocket()
        .await?;
    
    ws.send("Ping".into()).await?;
    if let Some(msg) = ws.next().await {
        println!("WebSocket Reply: {:?}", msg?);
    }
    
    Ok(())
}

高级用法

自定义WebSocket配置

use reqwest_websocket::{RequestBuilderExt, WebSocketUpgrade};
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = WebSocketConfig {
        max_send_queue: Some(1024),
        max_message_size: Some(64 * 1024),
        max_frame_size: Some(32 * 1024),
        accept_unmasked_frames: false,
    };
    
    let (ws, _) = reqwest::Client::new()
        .get("wss://echo.websocket.org")
        .upgrade_with_config(config)
        .send()
        .await?
        .into_websocket()
        .await?;
    
    // 使用WebSocket...
    
    Ok(())
}

处理WebSocket错误

use reqwest_websocket::{RequestBuilderExt, WebSocketUpgrade};
use futures_util::{SinkExt, StreamExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (mut ws, response) = reqwest::Client::new()
        .get("wss://echo.websocket.org")
        .upgrade()
        .send()
        .await?
        .into_websocket()
        .await?;
    
    println!("WebSocket upgrade response: {:?}", response);
    
    match ws.send("Ping".into()).await {
        Ok(_) => println!("Message sent successfully"),
        Err(e) => eprintln!("Failed to send message: {}", e),
    }
    
    while let Some(msg) = ws.next().await {
        match msg {
            Ok(msg) => println!("Received: {:?}", msg),
            Err(e) => eprintln!("Error receiving message: {}", e),
        }
    }
    
    Ok(())
}

完整示例代码

下面是一个完整的使用reqwest-websocket的示例,展示了如何同时处理HTTP请求和WebSocket连接:

use reqwest_websocket::{RequestBuilderExt, WebSocketUpgrade};
use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建HTTP客户端
    let client = reqwest::Client::new();
    
    // 示例1: 基本HTTP请求
    println!("=== 发送HTTP GET请求 ===");
    let http_resp = client.get("https://httpbin.org/get")
        .send()
        .await?;
    println!("HTTP状态码: {}", http_resp.status());
    println!("HTTP响应体: {}", http_resp.text().await?);
    
    // 示例2: WebSocket连接
    println!("\n=== 建立WebSocket连接 ===");
    let (mut ws, ws_response) = client.get("wss://echo.websocket.org")
        .upgrade()
        .send()
        .await?
        .into_websocket()
        .await?;
    
    println!("WebSocket升级响应: {:?}", ws_response);
    
    // 发送消息
    ws.send("Hello from reqwest-websocket!".into()).await?;
    println!("已发送WebSocket消息");
    
    // 接收消息
    if let Some(msg) = ws.next().await {
        match msg {
            Ok(msg) => println!("收到WebSocket回复: {:?}", msg),
            Err(e) => eprintln!("接收消息错误: {}", e),
        }
    }
    
    // 示例3: 自定义WebSocket配置
    println!("\n=== 使用自定义配置建立WebSocket连接 ===");
    let config = WebSocketConfig {
        max_send_queue: Some(1024),
        max_message_size: Some(64 * 1024),
        max_frame_size: Some(32 * 1024),
        accept_unmasked_frames: false,
    };
    
    let (mut custom_ws, _) = client.get("wss://echo.websocket.org")
        .upgrade_with_config(config)
        .send()
        .await?
        .into_websocket()
        .await?;
    
    custom_ws.send("测试自定义配置".into()).await?;
    if let Some(msg) = custom_ws.next().await {
        println!("自定义配置连接收到: {:?}", msg?);
    }
    
    Ok(())
}

注意事项

  1. 确保使用tokio运行时,并启用full特性
  2. WebSocket URL应使用ws://wss://协议
  3. 服务器必须支持WebSocket协议升级
  4. 对于生产环境,应考虑添加适当的错误处理和重试逻辑

reqwest-websocket为Rust开发者提供了一个统一的接口来处理HTTP和WebSocket通信,特别适合需要同时与这两种协议交互的应用程序。

回到顶部