Rust WebSocket客户端库reconnecting-jsonrpsee-ws-client的使用:自动重连的JSON-RPC WebSocket实现
reconnecting-jsonrpsee-ws-client 使用指南
这是一个基于jsonrpsee ws客户端的包装库,能够自动在后台重新连接;如果没有这个功能,用户需要手动重启连接。它支持几种重试策略,比如指数退避,同时也支持自定义策略,只要该策略实现了Iterator<Item = Duration>
。
核心功能
默认情况下,库会自动重新传输待处理的调用,并在连接中断时重新建立被关闭的订阅,但也可以禁用这个功能,自行管理。
自定义重试策略示例
let mut sub = client
.subscribe_with_policy(
"subscribe_lo".to_string(),
rpc_params![],
"unsubscribe_lo".to_string(),
// 如果连接关闭,不要重新订阅。
CallRetryPolicy::Retry,
)
.await
.unwrap();
监控重连时间
// 打印RPC客户端开始重新连接的时间。
loop {
rpc.reconnect_started().await;
let now = std::time::Instant::now();
rpc.reconnected().await;
println!(
"RPC client reconnection took `{} seconds`",
now.elapsed().as_secs()
);
}
完整示例代码
下面是一个完整的客户端实现示例:
use reconnecting_jsonrpsee_ws_client::{rpc_params, Client, ExponentialBackoff, PingConfig, CallRetryPolicy};
use std::time::Duration;
use futures::StreamExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 1. 创建客户端配置
let client = Client::builder()
// 设置重试策略:100ms初始延迟的指数退避,最多重试10次
.retry_policy(ExponentialBackoff::from_millis(100).take(10))
// 启用WebSocket ping/pong保持连接活跃
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(6))
.inactive_limit(Duration::from_secs(30)),
)
// 连接到WebSocket服务器
.build("ws://localhost:9944".to_string())
.await?;
// 2. 监控重连事件(可选)
let reconnect_monitor = {
let rpc = client.clone();
tokio::spawn(async move {
loop {
rpc.reconnect_started().await;
let start = std::time::Instant::now();
rpc.reconnected().await;
println!("Reconnected in {}ms", start.elapsed().as_millis());
}
})
};
// 3. 执行RPC请求
let version: String = client
.request("system_version".to_string(), rpc_params![])
.await?;
println!("Server version: {}", version);
// 4. 创建带重试策略的订阅
let mut blocks = client
.subscribe_with_policy(
"chain_subscribeNewHeads".to_string(),
rpc_params![],
"chain_unsubscribeNewHeads".to_string(),
CallRetryPolicy::Retry, // 自动重试订阅
)
.await?;
// 5. 处理订阅消息
while let Some(notification) = blocks.next().await {
match notification {
Ok(block) => println!("New block: {:?}", block),
Err(e) => eprintln!("Subscription error: {}", e),
}
}
// 取消监控任务
reconnect_monitor.abort();
Ok(())
}
关键点说明
- 重连策略:支持指数退避(ExponentialBackoff)和自定义策略
- 连接保持:通过PingConfig配置心跳检测
- 订阅管理:
- 默认自动重新订阅
- 可通过CallRetryPolicy::NoRetry禁用自动重试
- 监控能力:提供reconnect_started()和reconnected()方法监控重连过程
注意事项
- 重新连接时可能会丢失部分订阅通知
- 对于不能丢失通知的关键场景,不建议使用此库
- 实际使用时需要根据RPC服务器调整方法名和参数
1 回复
reconnecting-jsonrpsee-ws-client 使用指南
介绍
reconnecting-jsonrpsee-ws-client
是一个 Rust 库,提供了自动重连功能的 JSON-RPC WebSocket 客户端实现。它基于 jsonrpsee
库构建,主要解决了 WebSocket 连接不稳定时的自动恢复问题。
主要特性:
- 自动处理 WebSocket 连接断开和重连
- 内置 JSON-RPC 2.0 协议支持
- 提供异步 API 接口
- 可配置的重连策略
安装
在 Cargo.toml
中添加依赖:
[dependencies]
reconnecting-jsonrpsee-ws-client = "0.1"
tokio = { version = "1.0", features = ["full"] }
基本使用方法
创建客户端
use reconnecting_jsonrpsee_ws_client::ClientBuilder;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let client = ClientBuilder::default()
.build("wss://example.com/ws")
.await?;
Ok(())
}
发送 JSON-RPC 请求
let response: String = client.request("get_status", None).await?;
println!("Server status: {}", response);
处理通知
use jsonrpsee::core::client::Subscription;
let mut subscription: Subscription<String> = client.subscribe("subscribe_updates", None, "updates").await?;
while let Some(update) = subscription.next().await {
println!("Received update: {:?}", update);
}
高级配置
自定义重连策略
use std::time::Duration;
use reconnecting_jsonrpsee-ws-client::ReconnectOptions;
let options = ReconnectOptions::new()
.with_retry_policy(ReconnectOptions::retry_policy().with_max_retries(10))
.with_delay(Duration::from_secs(5));
let client = ClientBuilder::default()
.set_reconnect_options(options)
.build("wss://example.com/ws")
.await?;
自定义请求超时
use std::time::Duration;
let client = ClientBuilder::default()
.request_timeout(Duration::from_secs(30))
.build("wss://example.com/ws")
.await?;
完整示例
以下是基于上述内容的完整示例代码:
use reconnecting_jsonrpsee_ws_client::ClientBuilder;
use jsonrpsee::core::client::Subscription;
use std::time::Duration;
use anyhow::Context;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 创建带有自定义配置的客户端
let client = ClientBuilder::default()
.request_timeout(Duration::from_secs(10))
// 配置重连策略:最大重试10次,每次间隔5秒
.set_reconnect_options(
ReconnectOptions::new()
.with_retry_policy(ReconnectOptions::retry_policy().with_max_retries(10))
.with_delay(Duration::from_secs(5))
)
.build("wss://example.com/ws")
.await
.context("Failed to create WebSocket client")?;
// 发送请求获取服务器状态
match client.request("get_status", None).await {
Ok(status) => println!("Server status: {}", status),
Err(e) => eprintln!("Failed to get server status: {}", e),
}
// 发送请求获取服务器版本
let version: String = client.request("get_version", None).await?;
println!("Server version: {}", version);
// 订阅日志通知
let mut sub: Subscription<String> = client.subscribe("subscribe_logs", None, "logs").await?;
// 在单独的tokio任务中处理订阅消息
tokio::spawn(async move {
while let Some(log) = sub.next().await {
match log {
Ok(log) => println!("New log entry: {}", log),
Err(e) => eprintln!("Error in subscription: {}", e),
}
}
});
// 保持主线程运行,直到收到Ctrl+C信号
tokio::signal::ctrl_c().await?;
println!("Shutting down...");
Ok(())
}
错误处理
该库返回的错误类型实现了 std::error::Error
trait,可以使用 anyhow
或 thiserror
等库进行错误处理。
match client.request("some_method", None).await {
Ok(result) => println!("Success: {:?}", result),
Err(e) => eprintln!("Error: {}", e),
}
注意事项
- 确保 Tokio 运行时已正确初始化
- WebSocket URL 应以
ws://
或wss://
开头 - 对于生产环境,建议配置合理的重连策略和超时时间
- 订阅处理通常需要在单独的 Tokio 任务中运行
- 使用
anyhow
或thiserror
可以更好地处理嵌套错误
这个库特别适合需要长期保持 WebSocket 连接并处理 JSON-RPC 协议的场景,如区块链节点连接、实时数据监控等应用。