Rust gRPC通信插件库sigstat-grpc的使用,实现高效服务间远程调用与信号统计

Rust gRPC通信插件库sigstat-grpc的使用,实现高效服务间远程调用与信号统计

安装

全局安装二进制

cargo install sigstat-grpc

运行上述命令将全局安装mock_grpc_server二进制文件。

作为库安装

在项目目录中运行以下Cargo命令:

cargo add sigstat-grpc

或者将以下行添加到您的Cargo.toml中:

sigstat-grpc = "0.7.0"

示例代码

以下是使用sigstat-grpc实现gRPC通信的完整示例:

use sigstat_grpc::client::StatsigClient;
use sigstat_grpc::proto::statsig::v1::{
    CheckGateRequest, GetConfigRequest, GetLayerRequest
};
use tonic::transport::Channel;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建gRPC客户端连接
    let channel = Channel::from_static("http://[::1]:50051")
        .connect()
        .await?;
    
    let mut client = StatsigClient::new(channel);

    // 1. 检查功能门(Feature Gate)
    let gate_request = CheckGateRequest {
        user_id: "user_123".to_string(),
        gate_name: "new_feature".to_string(),
    };
    
    let gate_response = client.check_gate(gate_request).await?;
    println!("Feature gate response: {:?}", gate_response);

    // 2. 获取动态配置
    let config_request = GetConfigRequest {
        user_id: "user_123".to_string(),
        config_name: "app_settings".to_string(),
    };
    
    let config_response = client.get_config(config_request).await?;
    println!("Config response: {:?}", config_response);

    // 3. 获取分层配置(Layer)
    let layer_request = GetLayerRequest {
        user_id: "user_123".to_string(),
        layer_name: "experiment_group".to_string(),
    };
    
    let layer_response = client.get_layer(layer_request).await?;
    println!("Layer response: {:?}", layer_response);

    Ok(())
}

服务器端示例

use sigstat_grpc::server::StatsigServer;
use sigstat_grpc::proto::statsig::v1::statsig_server::StatsigServer as StatsigGrpcServer;
use tonic::transport::Server;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse()?;
    let statsig_service = StatsigServer::default();
    
    Server::builder()
        .add_service(StatsigGrpcServer::new(statsig_service))
        .serve(addr)
        .await?;

    Ok(())
}

主要功能

  1. 功能门(Feature Flags) - 通过check_gate方法控制功能的开启/关闭
  2. 动态配置 - 通过get_config方法获取服务配置
  3. 分层配置(Layers) - 通过get_layer方法实现实验分组
  4. 信号统计 - 内置性能监控和数据统计功能

许可证

ISC License

完整示例

以下是结合客户端和服务端的完整示例:

// 服务端代码
use sigstat_grpc::server::StatsigServer;
use sigstat_grpc::proto::statsig::v1::statsig_server::StatsigServer as StatsigGrpcServer;
use tonic::transport::Server;

#[tokio::main]
async fn start_server() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse()?;
    let statsig_service = StatsigServer::default();
    
    println!("Starting gRPC server on {}", addr);
    Server::builder()
        .add_service(StatsigGrpcServer::new(statsig_service))
        .serve(addr)
        .await?;

    Ok(())
}

// 客户端代码
use sigstat_grpc::client::StatsigClient;
use sigstat_grpc::proto::statsig::v1::{
    CheckGateRequest, GetConfigRequest, GetLayerRequest
};
use tonic::transport::Channel;

async fn run_client() -> Result<(), Box<dyn std::error::Error>> {
    // 创建gRPC客户端连接
    let channel = Channel::from_static("http://[::1]:50051")
        .connect()
        .await?;
    
    let mut client = StatsigClient::new(channel);

    // 测试功能门
    let gate_request = CheckGateRequest {
        user_id: "user_123".to_string(),
        gate_name: "new_feature".to_string(),
    };
    
    let gate_response = client.check_gate(gate_request).await?;
    println!("Feature gate response: {:?}", gate_response);

    // 测试动态配置
    let config_request = GetConfigRequest {
        user_id: "user_123".to_string(),
        config_name: "app_settings".to_string(),
    };
    
    let config_response = client.get_config(config_request).await?;
    println!("Config response: {:?}", config_response);

    // 测试分层配置
    let layer_request = GetLayerRequest {
        user_id: "user_123".to_string(),
        layer_name: "experiment_group".to_string(),
    };
    
    let layer_response = client.get_layer(layer_request).await?;
    println!("Layer response: {:?}", layer_response);

    Ok(())
}

// 主函数
#[tokio::main]
async fn main() {
    // 启动服务器
    let server = tokio::spawn(start_server());
    
    // 等待服务器启动
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    
    // 运行客户端
    if let Err(e) = run_client().await {
        eprintln!("Client error: {}", e);
    }
    
    // 关闭服务器
    server.abort();
}

1 回复

Rust gRPC通信插件库sigstat-grpc的使用指南

完整示例demo

下面是一个完整的sigstat-grpc使用示例,包含服务端和客户端的实现:

项目结构

grpc-demo/
├── Cargo.toml
├── proto/
│   └── example.proto
├── src/
│   ├── server.rs
│   └── client.rs
└── build.rs

1. Cargo.toml 配置

[package]
name = "grpc-demo"
version = "0.1.0"
edition = "2021"

[dependencies]
sigstat-grpc = "0.1"
tokio = { version = "1.0", features = ["full"] }
prost = "0.11"
tonic = "0.8"
futures = "0.3"

[build-dependencies]
tonic-build = "0.8"

2. proto/example.proto 定义

syntax = "proto3";

package example;

service ExampleService {
    rpc SayHello (HelloRequest) returns (HelloReply);
    rpc GetServerStats (StatsRequest) returns (ServerStats);
}

message HelloRequest {
    string name = 1;
}

message HelloReply {
    string message = 1;
}

message StatsRequest {}

message ServerStats {
    uint64 total_requests = 1;
    double cpu_usage = 2;
    double memory_usage = 3;
}

3. build.rs 构建脚本

fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::compile_protos("proto/example.proto")?;
    Ok(())
}

4. src/server.rs 服务端实现

use sigstat_grpc::prelude::*;
use tonic::{Request, Response, Status};
use example::{ExampleServiceServer, HelloRequest, HelloReply, StatsRequest, ServerStats};
use std::time::{Duration, Instant};

#[derive(Default)]
pub struct MyExampleService {
    request_count: std::sync::atomic::AtomicU64,
}

#[tonic::async_trait]
impl example::example_service_server::ExampleService for MyExampleService {
    async fn say_hello(
        &self,
        request: Request<HelloRequest>,
    ) -> Result<Response<HelloReply>, Status> {
        self.request_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        
        let name = request.into_inner().name;
        Ok(Response::new(HelloReply {
            message: format!("Hello, {}!", name),
        }))
    }

    async fn get_server_stats(
        &self,
        _request: Request<StatsRequest>,
    ) -> Result<Response<ServerStats>, Status> {
        Ok(Response::new(ServerStats {
            total_requests: self.request_count.load(std::sync::atomic::Ordering::SeqCst),
            cpu_usage: 0.25,  // 模拟CPU使用率
            memory_usage: 0.45,  // 模拟内存使用率
        }))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse()?;
    let service = MyExampleService::default();
    
    // 使用sigstat-grpc的监控服务包装
    let monitored_service = SigStatService::new(service)
        .with_stat_interval(Duration::from_secs(5))
        .with_slow_request_threshold(Duration::from_millis(300))
        .with_stat_callback(|stats| {
            println!("[SERVER] Current stats: {:?}", stats);
        });
    
    println!("Server listening on {}", addr);
    
    tonic::transport::Server::builder()
        .add_service(ExampleServiceServer::new(monitored_service))
        .serve(addr)
        .await?;

    Ok(())
}

5. src/client.rs 客户端实现

use sigstat_grpc::prelude::*;
use example::{HelloRequest, StatsRequest};
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建带有统计功能的客户端
    let mut client = SigStatClient::connect("http://[::1]:50051").await?;
    
    // 发送5个请求
    for i in 0..5 {
        let request = tonic::Request::new(HelloRequest {
            name: format!("User-{}", i),
        });
        
        let start = std::time::Instant::now();
        match client.say_hello(request).await {
            Ok(response) => {
                println!("[CLIENT] Response {}: {:?}", i, response.into_inner().message);
            }
            Err(e) => {
                println!("[CLIENT] Request {} failed: {:?}", i, e);
            }
        }
        
        // 获取并打印统计信息
        let stats = client.get_stats();
        println!("[CLIENT] Current stats after request {}: {:?}", i, stats);
        
        sleep(Duration::from_secs(1)).await;
    }
    
    // 获取服务器统计
    let stats_request = tonic::Request::new(StatsRequest {});
    let server_stats = client.get_server_stats(stats_request).await?;
    println!("[CLIENT] Server stats: {:?}", server_stats.into_inner());
    
    // 获取详细统计信息
    let detailed_stats = client.get_detailed_stats();
    println!(
        "[CLIENT] Detailed stats - Success rate: {:.2}%, Avg latency: {:.2}ms",
        detailed_stats.success_rate * 100.0,
        detailed_stats.avg_latency_ms
    );
    
    Ok(())
}

运行步骤

  1. 在项目根目录运行服务端:
cargo run --bin server
  1. 在另一个终端运行客户端:
cargo run --bin client

示例输出

服务端输出示例:

Server listening on [::1]:50051
[SERVER] Current stats: SigStat { total_requests: 1, success_count: 1, error_count: 0, ... }
[SERVER] Current stats: SigStat { total_requests: 5, success_count: 5, error_count: 0, ... }

客户端输出示例:

[CLIENT] Response 0: "Hello, User-0!"
[CLIENT] Current stats after request 0: ...
[CLIENT] Server stats: ServerStats { total_requests: 5, cpu_usage: 0.25, memory_usage: 0.45 }
[CLIENT] Detailed stats - Success rate: 100.00%, Avg latency: 12.34ms

这个完整示例展示了如何:

  1. 定义gRPC服务接口
  2. 实现带有统计功能的服务器
  3. 创建监控客户端
  4. 收集和分析通信统计数据
  5. 处理服务器端统计信息

您可以根据实际需求调整统计间隔、阈值等参数,或集成到现有项目中。

回到顶部