Rust WebSocket客户端库reconnecting-jsonrpsee-ws-client的使用:自动重连的JSON-RPC WebSocket实现

reconnecting-jsonrpsee-ws-client 使用指南

这是一个基于jsonrpsee ws客户端的包装库,能够自动在后台重新连接;如果没有这个功能,用户需要手动重启连接。它支持几种重试策略,比如指数退避,同时也支持自定义策略,只要该策略实现了Iterator<Item = Duration>

核心功能

默认情况下,库会自动重新传输待处理的调用,并在连接中断时重新建立被关闭的订阅,但也可以禁用这个功能,自行管理。

自定义重试策略示例

let mut sub = client
    .subscribe_with_policy(
        "subscribe_lo".to_string(),
        rpc_params![],
        "unsubscribe_lo".to_string(),
        // 如果连接关闭,不要重新订阅。
        CallRetryPolicy::Retry,
    )
    .await
    .unwrap();

监控重连时间

// 打印RPC客户端开始重新连接的时间。
loop {
    rpc.reconnect_started().await;
    let now = std::time::Instant::now();
    rpc.reconnected().await;
    println!(
        "RPC client reconnection took `{} seconds`",
        now.elapsed().as_secs()
    );
}

完整示例代码

下面是一个完整的客户端实现示例:

use reconnecting_jsonrpsee_ws_client::{rpc_params, Client, ExponentialBackoff, PingConfig, CallRetryPolicy};
use std::time::Duration;
use futures::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 1. 创建客户端配置
    let client = Client::builder()
        // 设置重试策略:100ms初始延迟的指数退避,最多重试10次
        .retry_policy(ExponentialBackoff::from_millis(100).take(10))
        // 启用WebSocket ping/pong保持连接活跃
        .enable_ws_ping(
            PingConfig::new()
                .ping_interval(Duration::from_secs(6))
                .inactive_limit(Duration::from_secs(30)),
        )
        // 连接到WebSocket服务器
        .build("ws://localhost:9944".to_string())
        .await?;

    // 2. 监控重连事件(可选)
    let reconnect_monitor = {
        let rpc = client.clone();
        tokio::spawn(async move {
            loop {
                rpc.reconnect_started().await;
                let start = std::time::Instant::now();
                rpc.reconnected().await;
                println!("Reconnected in {}ms", start.elapsed().as_millis());
            }
        })
    };

    // 3. 执行RPC请求
    let version: String = client
        .request("system_version".to_string(), rpc_params![])
        .await?;
    println!("Server version: {}", version);

    // 4. 创建带重试策略的订阅
    let mut blocks = client
        .subscribe_with_policy(
            "chain_subscribeNewHeads".to_string(),
            rpc_params![],
            "chain_unsubscribeNewHeads".to_string(),
            CallRetryPolicy::Retry, // 自动重试订阅
        )
        .await?;

    // 5. 处理订阅消息
    while let Some(notification) = blocks.next().await {
        match notification {
            Ok(block) => println!("New block: {:?}", block),
            Err(e) => eprintln!("Subscription error: {}", e),
        }
    }

    // 取消监控任务
    reconnect_monitor.abort();
    Ok(())
}

关键点说明

  1. 重连策略:支持指数退避(ExponentialBackoff)和自定义策略
  2. 连接保持:通过PingConfig配置心跳检测
  3. 订阅管理
    • 默认自动重新订阅
    • 可通过CallRetryPolicy::NoRetry禁用自动重试
  4. 监控能力:提供reconnect_started()和reconnected()方法监控重连过程

注意事项

  • 重新连接时可能会丢失部分订阅通知
  • 对于不能丢失通知的关键场景,不建议使用此库
  • 实际使用时需要根据RPC服务器调整方法名和参数

1 回复

reconnecting-jsonrpsee-ws-client 使用指南

介绍

reconnecting-jsonrpsee-ws-client 是一个 Rust 库,提供了自动重连功能的 JSON-RPC WebSocket 客户端实现。它基于 jsonrpsee 库构建,主要解决了 WebSocket 连接不稳定时的自动恢复问题。

主要特性:

  • 自动处理 WebSocket 连接断开和重连
  • 内置 JSON-RPC 2.0 协议支持
  • 提供异步 API 接口
  • 可配置的重连策略

安装

Cargo.toml 中添加依赖:

[dependencies]
reconnecting-jsonrpsee-ws-client = "0.1"
tokio = { version = "1.0", features = ["full"] }

基本使用方法

创建客户端

use reconnecting_jsonrpsee_ws_client::ClientBuilder;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let client = ClientBuilder::default()
        .build("wss://example.com/ws")
        .await?;
    
    Ok(())
}

发送 JSON-RPC 请求

let response: String = client.request("get_status", None).await?;
println!("Server status: {}", response);

处理通知

use jsonrpsee::core::client::Subscription;

let mut subscription: Subscription<String> = client.subscribe("subscribe_updates", None, "updates").await?;

while let Some(update) = subscription.next().await {
    println!("Received update: {:?}", update);
}

高级配置

自定义重连策略

use std::time::Duration;
use reconnecting_jsonrpsee-ws-client::ReconnectOptions;

let options = ReconnectOptions::new()
    .with_retry_policy(ReconnectOptions::retry_policy().with_max_retries(10))
    .with_delay(Duration::from_secs(5));

let client = ClientBuilder::default()
    .set_reconnect_options(options)
    .build("wss://example.com/ws")
    .await?;

自定义请求超时

use std::time::Duration;

let client = ClientBuilder::default()
    .request_timeout(Duration::from_secs(30))
    .build("wss://example.com/ws")
    .await?;

完整示例

以下是基于上述内容的完整示例代码:

use reconnecting_jsonrpsee_ws_client::ClientBuilder;
use jsonrpsee::core::client::Subscription;
use std::time::Duration;
use anyhow::Context;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 创建带有自定义配置的客户端
    let client = ClientBuilder::default()
        .request_timeout(Duration::from_secs(10))
        // 配置重连策略:最大重试10次,每次间隔5秒
        .set_reconnect_options(
            ReconnectOptions::new()
                .with_retry_policy(ReconnectOptions::retry_policy().with_max_retries(10))
                .with_delay(Duration::from_secs(5))
        )
        .build("wss://example.com/ws")
        .await
        .context("Failed to create WebSocket client")?;

    // 发送请求获取服务器状态
    match client.request("get_status", None).await {
        Ok(status) => println!("Server status: {}", status),
        Err(e) => eprintln!("Failed to get server status: {}", e),
    }

    // 发送请求获取服务器版本
    let version: String = client.request("get_version", None).await?;
    println!("Server version: {}", version);

    // 订阅日志通知
    let mut sub: Subscription<String> = client.subscribe("subscribe_logs", None, "logs").await?;
    
    // 在单独的tokio任务中处理订阅消息
    tokio::spawn(async move {
        while let Some(log) = sub.next().await {
            match log {
                Ok(log) => println!("New log entry: {}", log),
                Err(e) => eprintln!("Error in subscription: {}", e),
            }
        }
    });

    // 保持主线程运行,直到收到Ctrl+C信号
    tokio::signal::ctrl_c().await?;
    println!("Shutting down...");
    
    Ok(())
}

错误处理

该库返回的错误类型实现了 std::error::Error trait,可以使用 anyhowthiserror 等库进行错误处理。

match client.request("some_method", None).await {
    Ok(result) => println!("Success: {:?}", result),
    Err(e) => eprintln!("Error: {}", e),
}

注意事项

  1. 确保 Tokio 运行时已正确初始化
  2. WebSocket URL 应以 ws://wss:// 开头
  3. 对于生产环境,建议配置合理的重连策略和超时时间
  4. 订阅处理通常需要在单独的 Tokio 任务中运行
  5. 使用 anyhowthiserror 可以更好地处理嵌套错误

这个库特别适合需要长期保持 WebSocket 连接并处理 JSON-RPC 协议的场景,如区块链节点连接、实时数据监控等应用。

回到顶部