Rust RPC框架sp-rpc的使用:高性能分布式系统通信库sp-rpc的功能与实现解析

Rust RPC框架sp-rpc的使用:高性能分布式系统通信库sp-rpc的功能与实现解析

Substrate RPC原语和工具。

基本信息

  • 许可证: Apache-2.0
  • 版本: 36.0.0
  • 大小: 20.6 KiB
  • 发布时间: 13天前
  • 2021 edition

安装方式

在项目目录下运行以下Cargo命令:

cargo add sp-rpc

或者在Cargo.toml中添加:

sp-rpc = "36.0.0"

所有者

  • paritytech/Core开发团队
  • Parity Crate所有者

示例代码

以下是一个使用sp-rpc的基本示例:

use sp_rpc::{RpcHandler, RpcRequest, RpcResponse};

// 定义一个简单的RPC处理程序
struct MyRpcHandler;

impl RpcHandler for MyRpcHandler {
    fn handle_request(&self, request: RpcRequest) -> RpcResponse {
        // 处理RPC请求并返回响应
        match request.method.as_str() {
            "example_method" => RpcResponse::success("Hello from RPC!"),
            _ => RpcResponse::error("Method not found"),
        }
    }
}

fn main() {
    // 创建RPC处理程序实例
    let handler = MyRpcHandler;
    
    // 模拟一个RPC请求
    let request = RpcRequest {
        method: "example_method".to_string(),
        params: vec![],
        id: 1,
    };
    
    // 处理请求并获取响应
    let response = handler.handle_request(request);
    
    println!("RPC Response: {:?}", response);
}

完整示例demo

以下是一个更完整的sp-rpc使用示例,包含客户端和服务器端实现:

use sp_rpc::{RpcHandler, RpcRequest, RpcResponse};
use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use serde_json::{to_string, from_str};

// 自定义RPC服务处理程序
struct CalculatorService;

impl RpcHandler for CalculatorService {
    fn handle_request(&self, request: RpcRequest) -> RpcResponse {
        match request.method.as_str() {
            "add" => {
                // 解析参数
                if request.params.len() != 2 {
                    return RpcResponse::error("Invalid parameters");
                }
                let a: f64 = request.params[0].as_f64().unwrap_or(0.0);
                let b: f64 = request.params[1].as_f64().unwrap_or(0.0);
                RpcResponse::success(a + b)
            },
            "multiply" => {
                // 解析参数
                if request.params.len() != 2 {
                    return RpcResponse::error("Invalid parameters");
                }
                let a: f64 = request.params[0].as_f64().unwrap_or(0.0);
                let b: f64 = request.params[1].as_f64().unwrap_or(0.0);
                RpcResponse::success(a * b)
            },
            _ => RpcResponse::error("Method not supported")
        }
    }
}

// RPC服务器
fn start_rpc_server() {
    let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
    let handler = CalculatorService;
    
    println!("RPC Server started on 127.0.0.1:8080");
    
    for stream in listener.incoming() {
        match stream {
            Ok(mut stream) => {
                // 读取请求数据
                let mut buffer = [0; 1024];
                let bytes_read = stream.read(&mut buffer).unwrap();
                let request_str = String::from_utf8_lossy(&buffer[..bytes_read]);
                
                // 解析RPC请求
                let request: RpcRequest = from_str(&request_str).unwrap();
                
                // 处理请求
                let response = handler.handle_request(request);
                
                // 发送响应
                let response_str = to_string(&response).unwrap();
                stream.write_all(response_str.as_bytes()).unwrap();
            }
            Err(e) => {
                println!("Error: {}", e);
            }
        }
    }
}

// RPC客户端
fn rpc_client() {
    let mut stream = TcpStream::connect("127.0.0.1:8080").unwrap();
    
    // 创建加法请求
    let add_request = RpcRequest {
        method: "add".to_string(),
        params: vec![serde_json::json!(2.5), serde_json::json!(3.5)],
        id: 1
    };
    
    // 发送请求
    let request_str = to_string(&add_request).unwrap();
    stream.write_all(request_str.as_bytes()).unwrap();
    
    // 读取响应
    let mut buffer = [0; 1024];
    let bytes_read = stream.read(&mut buffer).unwrap();
    let response_str = String::from_utf8_lossy(&buffer[..bytes_read]);
    let response: RpcResponse = from_str(&response_str).unwrap();
    
    println!("Add result: {:?}", response);
    
    // 创建乘法请求
    let multiply_request = RpcRequest {
        method: "multiply".to_string(),
        params: vec![serde_json::json!(4.0), serde_json::json!(5.0)],
        id: 2
    };
    
    // 发送请求
    let request_str = to_string(&multiply_request).unwrap();
    stream.write_all(request_str.as_bytes()).unwrap();
    
    // 读取响应
    let bytes_read = stream.read(&mut buffer).unwrap();
    let response_str = String::from_utf8_lossy(&buffer[..bytes_read]);
    let response: RpcResponse = from_str(&response_str).unwrap();
    
    println!("Multiply result: {:?}", response);
}

fn main() {
    // 在实际应用中,服务器和客户端应该分开运行
    // 这里为了演示,先启动服务器线程,然后运行客户端
    
    std::thread::spawn(|| {
        start_rpc_server();
    });
    
    // 给服务器一点启动时间
    std::thread::sleep(std::time::Duration::from_secs(1));
    
    // 运行客户端
    rpc_client();
}

这个完整示例展示了:

  1. 实现一个包含加法和乘法功能的RPC服务
  2. 创建TCP基础的RPC服务器
  3. 实现RPC客户端通信
  4. 使用JSON进行请求和响应的序列化
  5. 完整的请求-响应处理流程

sp-rpc为构建高性能分布式系统提供了基础通信工具,特别适合区块链和分布式应用场景的需求。


1 回复

Rust RPC框架sp-rpc的使用:高性能分布式系统通信库sp-rpc的功能与实现解析

完整示例demo

以下是基于sp-rpc框架构建完整RPC服务的示例代码,包含服务端和客户端实现:

服务端代码 (server.rs)

use sp_rpc::{rpc_service, Server};
use sp_rpc::async_trait;

// 1. 定义RPC服务接口
#[rpc_service]
trait Calculator {
    async fn add(&self, a: i32, b: i32) -> i32;
    async fn subtract(&self, a: i32, b: i32) -> i32;
    async fn multiply(&self, a: i32, b: i32) -> i32;
}

// 2. 实现服务逻辑
struct CalculatorService;

#[async_trait]
impl Calculator for CalculatorService {
    async fn add(&self, a: i32, b: i32) -> i32 {
        a + b
    }
    
    async fn subtract(&self, a: i32, b: i32) -> i32 {
        a - b
    }
    
    async fn multiply(&self, a: i32, b: i32) -> i32 {
        a * b
    }
}

// 3. 启动RPC服务器
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建服务实例
    let service = CalculatorService;
    
    // 绑定服务器到指定地址
    let server = Server::bind("127.0.0.1:8080")
        .await?;
    
    println!("Server running on 127.0.0.1:8080");
    
    // 启动服务
    server.serve(service).await?;
    
    Ok(())
}

客户端代码 (client.rs)

use sp_rpc::Client;
use std::time::Duration;

// 复用服务端定义的trait(实际项目中应该放在共享库中)
#[sp_rpc::rpc_service]
trait Calculator {
    async fn add(&self, a: i32, b: i32) -> i32;
    async fn subtract(&self, a: i32, b: i32) -> i32;
    async fn multiply(&self, a: i32, b: i32) -> i32;
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 连接RPC服务器
    let client = Client::connect("127.0.0.1:8080")
        .timeout(Duration::from_secs(5))
        .await?;
    
    // 调用远程方法
    let add_result = client.add(10, 5).await?;
    let sub_result = client.subtract(10, 5).await?;
    let mul_result = client.multiply(10, 5).await?;
    
    // 打印结果
    println!("10 + 5 = {}", add_result);
    println!("10 - 5 = {}", sub_result);
    println!("10 * 5 = {}", mul_result);
    
    Ok(())
}

运行说明

  1. 创建新项目并添加依赖:
[dependencies]
sp-rpc = "0.5"
tokio = { version = "1.0", features = ["full"] }
  1. 先启动服务端:
cargo run --bin server
  1. 再运行客户端:
cargo run --bin client
  1. 预期输出:
10 + 5 = 15
10 - 5 = 5
10 * 5 = 50

高级功能示例:带中间件和自定义序列化

// 在服务端代码中添加:
use sp_rpc::codec::{Codec, BincodeCodec};
use sp_rpc::middleware::{Middleware, Next};
use sp_rpc::Request;

// 自定义中间件
struct MetricsMiddleware;

#[sp_rpc::async_trait]
impl Middleware for MetricsMiddleware {
    async fn call(&self, req: Request, next: Next<'_>) -> Result<Vec<u8>, sp_rpc::Error> {
        let start = std::time::Instant::now();
        let res = next.run(req).await;
        let duration = start.elapsed();
        println!("Request took {:?}", duration);
        res
    }
}

// 修改服务器启动代码:
let server = Server::bind("127.0.0.1:8080")
    .with_codec(BincodeCodec)  // 使用二进制序列化
    .with_middleware(MetricsMiddleware)  // 添加中间件
    .await?;

这个完整示例展示了sp-rpc框架的核心功能,包括:

  • 服务定义与实现
  • 服务端启动
  • 客户端调用
  • 中间件集成
  • 自定义序列化方式

开发者可以根据实际需求扩展这个基础框架,构建更复杂的分布式系统。

回到顶部