Rust WebSocket服务器库jsonrpsee-ws-server的使用:轻量级JSON-RPC 2.0协议实现与高性能WebSocket通信
Rust WebSocket服务器库jsonrpsee-ws-server的使用:轻量级JSON-RPC 2.0协议实现与高性能WebSocket通信
jsonrpsee-ws-server是一个Rust实现的轻量级WebSocket服务器库,支持JSON-RPC 2.0协议,提供高性能的WebSocket通信能力。
安装
在项目目录中运行以下Cargo命令:
cargo add jsonrpsee-ws-server
或者在Cargo.toml中添加:
jsonrpsee-ws-server = "0.15.1"
完整示例代码
以下是使用jsonrpsee-ws-server创建WebSocket JSON-RPC服务器的完整示例:
use jsonrpsee_ws_server::{WsServerBuilder, WsServerHandle};
use serde_json::Value;
use std::net::SocketAddr;
// 定义RPC方法
async fn echo(params: Vec<Value>) -> Result<Value, jsonrpsee_core::Error> {
// 简单返回第一个参数
params.first().cloned().ok_or_else(|| jsonrpsee_core::Error::invalid_params("No parameters provided"))
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 创建服务器构建器
let server = WsServerBuilder::default()
.build("127.0.0.1:9944".parse::<SocketAddr>()?)
.await?;
// 注册RPC方法
let mut module = jsonrpsee_ws_server::RpcModule::new();
module.register_method("echo", echo)?;
// 启动服务器
let server_handle = server.start(module)?;
println!("Server started at ws://127.0.0.1:9944");
// 等待服务器关闭
server_handle.stopped().await;
Ok(())
}
使用说明
- 首先创建
WsServerBuilder
实例并配置服务器地址 - 创建
RpcModule
并注册RPC方法 - 启动服务器并获取
WsServerHandle
- 使用
stopped()
方法等待服务器停止
特性
- 轻量级JSON-RPC 2.0协议实现
- 高性能WebSocket通信
- 异步支持
- 易于使用的API
- 支持自定义RPC方法
完整示例demo
以下是基于jsonrpsee-ws-server的扩展示例,演示了多个RPC方法的注册和使用:
use jsonrpsee_ws_server::{WsServerBuilder, WsServerHandle};
use serde_json::{Value, json};
use std::net::SocketAddr;
// 定义echo方法
async fn echo(params: Vec<Value>) -> Result<Value, jsonrpsee_core::Error> {
params.first().cloned().ok_or_else(|| jsonrpsee_core::Error::invalid_params("No parameters provided"))
}
// 定义add方法,计算两个数的和
async fn add(params: Vec<Value>) -> Result<Value, jsonrpsee_core::Error> {
if params.len() != 2 {
return Err(jsonrpsee_core::Error::invalid_params("Expected 2 parameters"));
}
let a = params[0].as_f64().ok_or_else(|| jsonrpsee_core::Error::invalid_params("First parameter must be a number"))?;
let b = params[1].as_f64().ok_or_else(|| jsonrpsee_core::Error::invalid_params("Second parameter must be a number"))?;
Ok(json!(a + b))
}
// 定义get_time方法,返回当前时间戳
async fn get_time(_params: Vec<Value>) -> Result<Value, jsonrpsee_core::Error> {
Ok(json!(std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs()))
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 创建服务器构建器
let server = WsServerBuilder::default()
.build("127.0.0.1:9944".parse::<SocketAddr>()?)
.await?;
// 创建RPC模块并注册方法
let mut module = jsonrpsee_ws_server::RpcModule::new();
module.register_method("echo", echo)?;
module.register_method("add", add)?;
module.register_method("get_time", get_time)?;
// 启动服务器
let server_handle = server.start(module)?;
println!("JSON-RPC WebSocket服务器已启动: ws://127.0.0.1:9944");
println!("可用方法:");
println!("- echo(params): 返回第一个参数");
println!("- add(a, b): 返回两个数的和");
println!("- get_time(): 返回当前UNIX时间戳");
// 等待服务器关闭
server_handle.stopped().await;
Ok(())
}
文档
更多详细使用方法请参考官方文档。
1 回复
Rust WebSocket服务器库jsonrpsee-ws-server的使用
简介
jsonrpsee-ws-server
是一个轻量级的Rust库,实现了JSON-RPC 2.0协议并通过WebSocket提供高性能通信。它是jsonrpsee
框架的一部分,专门用于WebSocket服务器端的JSON-RPC实现。
主要特点:
- 完全兼容JSON-RPC 2.0规范
- 异步设计,基于tokio运行时
- 支持WebSocket全双工通信
- 轻量级且高性能
- 提供类型安全的RPC调用
安装
在Cargo.toml中添加依赖:
[dependencies]
jsonrpsee-ws-server = "0.16"
tokio = { version = "1.0", features = ["full"] }
完整示例代码
下面是一个整合了多个功能的完整示例,包括服务器和客户端代码:
服务器端代码 (server.rs)
use jsonrpsee_ws_server::{WsServerBuilder, RpcModule, SubscriptionSink, Error};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use std::time::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 创建共享状态
let counter = Arc::new(AtomicU32::new(0));
let connections = Arc::new(Mutex::new(0));
// 创建服务器实例
let server = WsServerBuilder::default()
.max_connections(10)
.max_request_body_size(1024 * 1024) // 1MB
.build("127.0.0.1:9944")
.await?;
// 创建RPC模块
let mut module = RpcModule::new(());
// 克隆共享状态
let counter_clone = counter.clone();
let connections_clone = connections.clone();
// 注册简单方法
module.register_method("say_hello", |_, _| {
Ok("Hello from JSON-RPC over WebSocket!")
})?;
// 注册异步方法
module.register_async_method("async_hello", |_, _| async move {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok("Delayed hello after 1 second!")
})?;
// 注册带参数的方法
module.register_method("add", |params, _| {
let (a, b): (i32, i32) = params.parse()?;
Ok(a + b)
})?;
// 使用上下文数据的方法
module.register_method("increment", move |_, _| {
let prev = counter_clone.fetch_add(1, Ordering::SeqCst);
Ok(prev)
})?;
module.register_method("get_count", move |_, _| {
Ok(counter.load(Ordering::SeqCst))
})?;
// 错误处理方法
module.register_method("divide", |params, _| {
let (a, b): (f64, f64) = params.parse()?;
if b == 0.0 {
return Err(Error::Custom("Division by zero".into()));
}
Ok(a / b)
})?;
// 连接计数方法
module.register_method("get_connections", move |_, _| {
Ok(connections.load(Ordering::SeqCst))
})?;
// 订阅方法
module.register_subscription("subscribe_ticks", "tick", "unsubscribe_ticks", move |_, mut sink, _| {
let counter_clone = counter.clone();
async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
// 增加连接计数
{
let mut conn = connections_clone.lock().await;
*conn += 1;
}
loop {
interval.tick().await;
let count = counter_clone.load(Ordering::SeqCst);
if sink.send(&count).await.is_err() {
break;
}
}
// 减少连接计数
{
let mut conn = connections_clone.lock().await;
*conn -= 1;
}
Ok(())
}
})?;
// 启动服务器
let addr = server.local_addr()?;
println!("Server listening on ws://{}", addr);
server.start(module).await?;
Ok(())
}
客户端代码 (client.rs)
use jsonrpsee_ws_client::{WsClientBuilder};
use futures::StreamExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 创建客户端
let client = WsClientBuilder::default()
.build("ws://127.0.0.1:9944")
.await?;
// 调用简单方法
let response: String = client.request("say_hello", None).await?;
println!("Server response: {}", response);
// 调用异步方法
let delayed: String = client.request("async_hello", None).await?;
println!("Async response: {}", delayed);
// 调用带参数的方法
let sum: i32 = client.request("add", (5, 7)).await?;
println!("5 + 7 = {}", sum);
// 调用状态方法
let _ = client.request::<_, u32>("increment", None).await?;
let count: u32 = client.request("get_count", None).await?;
println!("Current count: {}", count);
// 错误处理测试
match client.request::<_, f64>("divide", (10.0, 0.0)).await {
Ok(_) => println!("Division succeeded (unexpected)"),
Err(e) => println!("Division error: {}", e),
}
// 订阅测试
println!("Subscribing to ticks...");
let mut sub = client.subscribe("subscribe_ticks", None, "unsubscribe_ticks").await?;
tokio::spawn(async move {
while let Some(Ok(tick)) = sub.next().await {
println!("Received tick: {}", tick);
}
println!("Subscription ended");
});
// 保持客户端运行
tokio::signal::ctrl_c().await?;
println!("Shutting down");
Ok(())
}
运行说明
- 创建Cargo项目并添加依赖:
[dependencies]
jsonrpsee-ws-server = "0.16"
jsonrpsee-ws-client = "0.16"
tokio = { version = "1.0", features = ["full"] }
anyhow = "1.0"
-
将服务器代码保存为
server.rs
,客户端代码保存为client.rs
-
在
Cargo.toml
中添加二进制目标:
[[bin]]
name = "server"
path = "src/server.rs"
[[bin]]
name = "client"
path = "src/client.rs"
- 运行服务器:
cargo run --bin server
- 在另一个终端运行客户端:
cargo run --bin client
这个完整示例展示了jsonrpsee-ws-server
的主要功能:
- 基本RPC方法调用
- 异步方法
- 带参数的方法
- 共享状态管理
- 错误处理
- 订阅模式
- 连接管理
- 性能优化(使用Arc和Mutex)
您可以根据需要修改或扩展这个示例来满足特定需求。