Rust WebSocket服务器库jsonrpsee-ws-server的使用:轻量级JSON-RPC 2.0协议实现与高性能WebSocket通信

Rust WebSocket服务器库jsonrpsee-ws-server的使用:轻量级JSON-RPC 2.0协议实现与高性能WebSocket通信

jsonrpsee-ws-server是一个Rust实现的轻量级WebSocket服务器库,支持JSON-RPC 2.0协议,提供高性能的WebSocket通信能力。

安装

在项目目录中运行以下Cargo命令:

cargo add jsonrpsee-ws-server

或者在Cargo.toml中添加:

jsonrpsee-ws-server = "0.15.1"

完整示例代码

以下是使用jsonrpsee-ws-server创建WebSocket JSON-RPC服务器的完整示例:

use jsonrpsee_ws_server::{WsServerBuilder, WsServerHandle};
use serde_json::Value;
use std::net::SocketAddr;

// 定义RPC方法
async fn echo(params: Vec<Value>) -> Result<Value, jsonrpsee_core::Error> {
    // 简单返回第一个参数
    params.first().cloned().ok_or_else(|| jsonrpsee_core::Error::invalid_params("No parameters provided"))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 创建服务器构建器
    let server = WsServerBuilder::default()
        .build("127.0.0.1:9944".parse::<SocketAddr>()?)
        .await?;
    
    // 注册RPC方法
    let mut module = jsonrpsee_ws_server::RpcModule::new();
    module.register_method("echo", echo)?;
    
    // 启动服务器
    let server_handle = server.start(module)?;
    
    println!("Server started at ws://127.0.0.1:9944");
    
    // 等待服务器关闭
    server_handle.stopped().await;
    
    Ok(())
}

使用说明

  1. 首先创建WsServerBuilder实例并配置服务器地址
  2. 创建RpcModule并注册RPC方法
  3. 启动服务器并获取WsServerHandle
  4. 使用stopped()方法等待服务器停止

特性

  • 轻量级JSON-RPC 2.0协议实现
  • 高性能WebSocket通信
  • 异步支持
  • 易于使用的API
  • 支持自定义RPC方法

完整示例demo

以下是基于jsonrpsee-ws-server的扩展示例,演示了多个RPC方法的注册和使用:

use jsonrpsee_ws_server::{WsServerBuilder, WsServerHandle};
use serde_json::{Value, json};
use std::net::SocketAddr;

// 定义echo方法
async fn echo(params: Vec<Value>) -> Result<Value, jsonrpsee_core::Error> {
    params.first().cloned().ok_or_else(|| jsonrpsee_core::Error::invalid_params("No parameters provided"))
}

// 定义add方法,计算两个数的和
async fn add(params: Vec<Value>) -> Result<Value, jsonrpsee_core::Error> {
    if params.len() != 2 {
        return Err(jsonrpsee_core::Error::invalid_params("Expected 2 parameters"));
    }
    
    let a = params[0].as_f64().ok_or_else(|| jsonrpsee_core::Error::invalid_params("First parameter must be a number"))?;
    let b = params[1].as_f64().ok_or_else(|| jsonrpsee_core::Error::invalid_params("Second parameter must be a number"))?;
    
    Ok(json!(a + b))
}

// 定义get_time方法,返回当前时间戳
async fn get_time(_params: Vec<Value>) -> Result<Value, jsonrpsee_core::Error> {
    Ok(json!(std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)?
        .as_secs()))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 创建服务器构建器
    let server = WsServerBuilder::default()
        .build("127.0.0.1:9944".parse::<SocketAddr>()?)
        .await?;
    
    // 创建RPC模块并注册方法
    let mut module = jsonrpsee_ws_server::RpcModule::new();
    module.register_method("echo", echo)?;
    module.register_method("add", add)?;
    module.register_method("get_time", get_time)?;
    
    // 启动服务器
    let server_handle = server.start(module)?;
    
    println!("JSON-RPC WebSocket服务器已启动: ws://127.0.0.1:9944");
    println!("可用方法:");
    println!("- echo(params): 返回第一个参数");
    println!("- add(a, b): 返回两个数的和");
    println!("- get_time(): 返回当前UNIX时间戳");
    
    // 等待服务器关闭
    server_handle.stopped().await;
    
    Ok(())
}

文档

更多详细使用方法请参考官方文档。


1 回复

Rust WebSocket服务器库jsonrpsee-ws-server的使用

简介

jsonrpsee-ws-server是一个轻量级的Rust库,实现了JSON-RPC 2.0协议并通过WebSocket提供高性能通信。它是jsonrpsee框架的一部分,专门用于WebSocket服务器端的JSON-RPC实现。

主要特点:

  • 完全兼容JSON-RPC 2.0规范
  • 异步设计,基于tokio运行时
  • 支持WebSocket全双工通信
  • 轻量级且高性能
  • 提供类型安全的RPC调用

安装

在Cargo.toml中添加依赖:

[dependencies]
jsonrpsee-ws-server = "0.16"
tokio = { version = "1.0", features = ["full"] }

完整示例代码

下面是一个整合了多个功能的完整示例,包括服务器和客户端代码:

服务器端代码 (server.rs)

use jsonrpsee_ws_server::{WsServerBuilder, RpcModule, SubscriptionSink, Error};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use std::time::Duration;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 创建共享状态
    let counter = Arc::new(AtomicU32::new(0));
    let connections = Arc::new(Mutex::new(0));
    
    // 创建服务器实例
    let server = WsServerBuilder::default()
        .max_connections(10)
        .max_request_body_size(1024 * 1024) // 1MB
        .build("127.0.0.1:9944")
        .await?;
    
    // 创建RPC模块
    let mut module = RpcModule::new(());
    
    // 克隆共享状态
    let counter_clone = counter.clone();
    let connections_clone = connections.clone();

    // 注册简单方法
    module.register_method("say_hello", |_, _| {
        Ok("Hello from JSON-RPC over WebSocket!")
    })?;
    
    // 注册异步方法
    module.register_async_method("async_hello", |_, _| async move {
        tokio::time::sleep(Duration::from_secs(1)).await;
        Ok("Delayed hello after 1 second!")
    })?;
    
    // 注册带参数的方法
    module.register_method("add", |params, _| {
        let (a, b): (i32, i32) = params.parse()?;
        Ok(a + b)
    })?;
    
    // 使用上下文数据的方法
    module.register_method("increment", move |_, _| {
        let prev = counter_clone.fetch_add(1, Ordering::SeqCst);
        Ok(prev)
    })?;
    
    module.register_method("get_count", move |_, _| {
        Ok(counter.load(Ordering::SeqCst))
    })?;
    
    // 错误处理方法
    module.register_method("divide", |params, _| {
        let (a, b): (f64, f64) = params.parse()?;
        if b == 0.0 {
            return Err(Error::Custom("Division by zero".into()));
        }
        Ok(a / b)
    })?;
    
    // 连接计数方法
    module.register_method("get_connections", move |_, _| {
        Ok(connections.load(Ordering::SeqCst))
    })?;
    
    // 订阅方法
    module.register_subscription("subscribe_ticks", "tick", "unsubscribe_ticks", move |_, mut sink, _| {
        let counter_clone = counter.clone();
        async move {
            let mut interval = tokio::time::interval(Duration::from_secs(1));
            
            // 增加连接计数
            {
                let mut conn = connections_clone.lock().await;
                *conn += 1;
            }
            
            loop {
                interval.tick().await;
                let count = counter_clone.load(Ordering::SeqCst);
                
                if sink.send(&count).await.is_err() {
                    break;
                }
            }
            
            // 减少连接计数
            {
                let mut conn = connections_clone.lock().await;
                *conn -= 1;
            }
            
            Ok(())
        }
    })?;
    
    // 启动服务器
    let addr = server.local_addr()?;
    println!("Server listening on ws://{}", addr);
    
    server.start(module).await?;
    Ok(())
}

客户端代码 (client.rs)

use jsonrpsee_ws_client::{WsClientBuilder};
use futures::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 创建客户端
    let client = WsClientBuilder::default()
        .build("ws://127.0.0.1:9944")
        .await?;
    
    // 调用简单方法
    let response: String = client.request("say_hello", None).await?;
    println!("Server response: {}", response);
    
    // 调用异步方法
    let delayed: String = client.request("async_hello", None).await?;
    println!("Async response: {}", delayed);
    
    // 调用带参数的方法
    let sum: i32 = client.request("add", (5, 7)).await?;
    println!("5 + 7 = {}", sum);
    
    // 调用状态方法
    let _ = client.request::<_, u32>("increment", None).await?;
    let count: u32 = client.request("get_count", None).await?;
    println!("Current count: {}", count);
    
    // 错误处理测试
    match client.request::<_, f64>("divide", (10.0, 0.0)).await {
        Ok(_) => println!("Division succeeded (unexpected)"),
        Err(e) => println!("Division error: {}", e),
    }
    
    // 订阅测试
    println!("Subscribing to ticks...");
    let mut sub = client.subscribe("subscribe_ticks", None, "unsubscribe_ticks").await?;
    
    tokio::spawn(async move {
        while let Some(Ok(tick)) = sub.next().await {
            println!("Received tick: {}", tick);
        }
        println!("Subscription ended");
    });
    
    // 保持客户端运行
    tokio::signal::ctrl_c().await?;
    println!("Shutting down");
    
    Ok(())
}

运行说明

  1. 创建Cargo项目并添加依赖:
[dependencies]
jsonrpsee-ws-server = "0.16"
jsonrpsee-ws-client = "0.16"
tokio = { version = "1.0", features = ["full"] }
anyhow = "1.0"
  1. 将服务器代码保存为server.rs,客户端代码保存为client.rs

  2. Cargo.toml中添加二进制目标:

[[bin]]
name = "server"
path = "src/server.rs"

[[bin]]
name = "client"
path = "src/client.rs"
  1. 运行服务器:
cargo run --bin server
  1. 在另一个终端运行客户端:
cargo run --bin client

这个完整示例展示了jsonrpsee-ws-server的主要功能:

  • 基本RPC方法调用
  • 异步方法
  • 带参数的方法
  • 共享状态管理
  • 错误处理
  • 订阅模式
  • 连接管理
  • 性能优化(使用Arc和Mutex)

您可以根据需要修改或扩展这个示例来满足特定需求。

回到顶部