Rust gRPC客户端库yellowstone-grpc-client的使用,实现高性能跨平台gRPC通信与数据交互
Rust gRPC客户端库yellowstone-grpc-client的使用,实现高性能跨平台gRPC通信与数据交互
简单的gRPC客户端连接到Yellowstone gRPC Geyser
查看存储库中的使用示例。
安装
在项目目录中运行以下Cargo命令:
cargo add yellowstone-grpc-client
或者将以下行添加到Cargo.toml:
yellowstone-grpc-client = "9.0.0"
完整示例代码
use yellowstone_grpc_client::GeyserClient;
use solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin;
use tonic::transport::Channel;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建gRPC客户端连接
let channel = Channel::from_static("http://[::1]:10000")
.connect()
.await?;
// 实例化Geyser客户端
let mut client = GeyserClient::new(channel);
// 配置订阅参数
let subscribe_request = tonic::Request::new(
solana_geyser_plugin_interface::geyser_plugin_interface::SubscribeRequest {
slots: vec![],
accounts: vec![],
transactions: vec![],
entries: vec![],
accounts_data_slice: vec![],
commitment: Some(solana_geyser_plugin_interface::geyser_plugin_interface::CommitmentLevel::Confirmed as i32),
}
);
// 发起订阅请求
let mut stream = client
.subscribe(subscribe_request)
.await?
.into_inner();
// 处理接收到的消息流
while let Some(message) = stream.message().await? {
// 处理不同类型的更新消息
match message.update_oneof {
Some(solana_geyser_plugin_interface::geyser_plugin_interface::subscribe_update::UpdateOneof::SlotUpdate(update)) => {
println!("收到插槽更新: {:?}", update);
}
Some(solana_geyser_plugin_interface::geyser_plugin_interface::subscribe_update::UpdateOneof::AccountUpdate(update)) => {
println!("收到账户更新: {:?}", update);
}
Some(solana_geyser_plugin_interface::geyser_plugin_interface::subscribe_update::UpdateOneof::TransactionUpdate(update)) => {
println!("收到交易更新: {:?}", update);
}
_ => {}
}
}
Ok(())
}
功能特性
- 高性能gRPC客户端实现
- 跨平台兼容性
- 实时数据流订阅
- 支持Solana区块链数据交互
- 异步非阻塞IO
依赖配置
确保在Cargo.toml中包含必要的依赖:
[dependencies]
yellowstone-grpc-client = "9.0.0"
tokio = { version = "1.0", features = ["full"] }
tonic = "0.8"
prost = "0.11"
使用说明
- 连接到Yellowstone gRPC服务端
- 配置订阅的数据类型(插槽、账户、交易等)
- 处理实时数据流更新
- 根据业务需求处理接收到的数据
该库提供了与Solana区块链Geyser接口的高性能gRPC通信能力,支持实时数据订阅和处理。
1 回复
Rust gRPC客户端库yellowstone-grpc-client使用指南
概述
yellowstone-grpc-client是一个高性能的Rust gRPC客户端库,专为跨平台gRPC通信和数据交互设计。它提供了简洁的API接口和强大的功能支持,适用于需要高效网络通信的各种应用场景。
主要特性
- 异步/await支持
- 跨平台兼容性
- 连接池管理
- 负载均衡
- 健康检查
- TLS/SSL加密支持
- 双向流式传输
安装方法
在Cargo.toml中添加依赖:
[dependencies]
yellowstone-grpc-client = "0.5"
tokio = { version = "1.0", features = ["full"] }
prost = "0.11"
tonic = "0.8"
基本使用方法
1. 创建gRPC客户端
use yellowstone_grpc_client::Client;
use tonic::transport::Channel;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建客户端连接
let mut client = Client::connect("http://[::1]:50051").await?;
// 使用客户端进行通信
// ...
Ok(())
}
2. 一元RPC调用示例
use yellowstone_grpc_client::Client;
use tonic::Request;
// 假设有以下protobuf定义的消息
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HelloRequest {
#[prost(string, tag = "1")]
pub name: String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HelloResponse {
#[prost(string, tag = "1")]
pub message: String,
}
async fn call_unary_rpc() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("http://localhost:50051").await?;
let request = Request::new(HelloRequest {
name: "World".to_string(),
});
let response = client.hello(request).await?;
println!("响应: {}", response.into_inner().message);
Ok(())
}
3. 服务器端流式调用
async fn server_streaming_example() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("http://localhost:50051").await?;
let request = Request::new(HelloRequest {
name: "Stream".to_string(),
});
let mut stream = client.server_streaming_hello(request).await?.into_inner();
while let Some(response) = stream.message().await? {
println!("收到流式响应: {}", response.message);
}
Ok(())
}
4. 客户端流式调用
async fn client_streaming_example() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("http://localhost:50051").await?;
let request = tonic::Request::new(tokio_stream::iter(vec![
HelloRequest { name: "消息1".to_string() },
HelloRequest { name: "消息2".to_string() },
HelloRequest { name: "消息3".to_string() },
]));
let response = client.client_streaming_hello(request).await?;
println!("服务器响应: {}", response.into_inner().message);
Ok(())
}
5. 双向流式调用
async fn bidirectional_streaming_example() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("http://localhost:50051").await?;
let request = Request::new(tokio_stream::iter(vec![
HelloRequest { name: "请求1".to_string() },
HelloRequest { name: "请求2".to_string() },
]));
let mut stream = client.bidirectional_streaming_hello(request).await?.into_inner();
while let Some(response) = stream.message().await? {
println!("收到双向流响应: {}", response.message);
}
Ok(())
}
高级配置
连接池配置
use yellowstone_grpc_client::Client;
use tonic::transport::Endpoint;
async fn configure_connection_pool() -> Result<(), Box<dyn std::error::Error>> {
let endpoint = Endpoint::from_static("http://localhost:50051")
.concurrency_limit(10) // 最大并发连接数
.timeout(std::time::Duration::from_secs(30))
.connect_timeout(std::time::Duration::from_secs(5));
let client = Client::connect(endpoint).await?;
Ok(())
}
TLS/SSL配置
use tonic::transport::{Channel, ClientTlsConfig};
async fn tls_connection() -> Result<(), Box<dyn std::error::Error>> {
let tls = ClientTlsConfig::new()
.ca_certificate(tonic::transport::Certificate::from_pem(include_bytes!("ca.pem")))
.domain_name("example.com");
let channel = Channel::from_static("https://example.com:50051")
.tls_config(tls)?
.connect()
.await?;
let client = Client::new(channel);
Ok(())
}
错误处理
use yellowstone_grpc_client::Client;
use tonic::{Status, Code};
async fn handle_errors() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("http://localhost:50051").await?;
match client.hello(Request::new(HelloRequest { name: "test".to_string() })).await {
Ok(response) => {
println!("成功: {}", response.into_inner().message);
}
Err(status) => {
match status.code() {
Code::NotFound => eprintln!("服务未找到"),
Code::DeadlineExceeded => eprintln!("请求超时"),
Code::Unauthenticated => eprintln!("认证失败"),
_ => eprintln!("其他错误: {}", status),
}
}
}
Ok(())
}
性能优化建议
- 重用客户端实例:避免频繁创建和销毁客户端
- 使用连接池:合理配置连接池大小
- 批量处理:对于大量小请求,考虑使用流式传输
- 超时设置:根据业务需求设置合理的超时时间
- 监控指标:实现监控和日志记录以便性能分析
注意事项
- 确保gRPC服务端兼容yellowstone-grpc-client的版本
- 在生产环境中使用TLS加密通信
- 合理处理连接断开和重连逻辑
- 注意资源清理,及时关闭不再使用的流
这个库为Rust开发者提供了强大而灵活的gRPC客户端解决方案,适用于各种分布式系统和微服务架构。
完整示例demo
//! yellowstone-grpc-client 完整使用示例
use yellowstone_grpc_client::Client;
use tonic::{Request, Status, Code};
use tonic::transport::{Endpoint, Channel, ClientTlsConfig};
use tokio_stream::StreamExt;
// Protobuf 消息定义
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HelloRequest {
#[prost(string, tag = "1")]
pub name: String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HelloResponse {
#[prost(string, tag = "1")]
pub message: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. 创建基础客户端连接
println!("创建基础客户端连接...");
let mut client = Client::connect("http://localhost:50051").await?;
// 2. 一元RPC调用示例
println!("\n执行一元RPC调用...");
match call_unary_rpc(&mut client).await {
Ok(_) => println!("一元调用成功"),
Err(e) => eprintln!("一元调用失败: {}", e),
}
// 3. 使用连接池配置
println!("\n配置连接池...");
let endpoint = Endpoint::from_static("http://localhost:50051")
.concurrency_limit(10)
.timeout(std::time::Duration::from_secs(30))
.connect_timeout(std::time::Duration::from_secs(5));
let pool_client = Client::connect(endpoint).await?;
// 4. 错误处理示例
println!("\n测试错误处理...");
handle_errors_example().await?;
println!("\n所有示例执行完成!");
Ok(())
}
/// 一元RPC调用函数
async fn call_unary_rpc(client: &mut Client) -> Result<(), Box<dyn std::error::Error>> {
let request = Request::new(HelloRequest {
name: "World".to_string(),
});
let response = client.hello(request).await?;
println!("服务器响应: {}", response.into_inner().message);
Ok(())
}
/// 服务器端流式调用示例
async fn server_streaming_example() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("http://localhost:50051").await?;
let request = Request::new(HelloRequest {
name: "Stream".to_string(),
});
let mut stream = client.server_streaming_hello(request).await?.into_inner();
while let Some(response) = stream.message().await? {
println!("收到流式响应: {}", response.message);
}
Ok(())
}
/// 客户端流式调用示例
async fn client_streaming_example() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("http://localhost:50051").await?;
let request = tonic::Request::new(tokio_stream::iter(vec![
HelloRequest { name: "消息1".to_string() },
HelloRequest { name: "消息2".to_string() },
HelloRequest { name: "消息3".to_string() },
]));
let response = client.client_streaming_hello(request).await?;
println!("服务器汇总响应: {}", response.into_inner().message);
Ok(())
}
/// 双向流式调用示例
async fn bidirectional_streaming_example() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("http://localhost:50051").await?;
let request = Request::new(tokio_stream::iter(vec![
HelloRequest { name: "请求1".to_string() },
HelloRequest { name: "请求2".to_string() },
HelloRequest { name: "请求3".to_string() },
]));
let mut stream = client.bidirectional_streaming_hello(request).await?.into_inner();
while let Some(response) = stream.message().await? {
println!("收到双向流响应: {}", response.message);
}
Ok(())
}
/// 错误处理示例
async fn handle_errors_example() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("http://localhost:50051").await?;
// 测试正常请求
match client.hello(Request::new(HelloRequest { name: "test".to_string() })).await {
Ok(response) => {
println!("请求成功: {}", response.into_inner().message);
}
Err(status) => {
// 根据错误码进行不同的处理
match status.code() {
Code::NotFound => eprintln!("错误: 服务未找到"),
Code::DeadlineExceeded => eprintln!("错误: 请求超时"),
Code::Unauthenticated => eprintln!("错误: 认证失败"),
Code::Unavailable => eprintln!("错误: 服务不可用"),
_ => eprintln!("其他错误: {}", status),
}
}
}
Ok(())
}
/// TLS安全连接示例
async fn tls_connection_example() -> Result<(), Box<dyn std::error::Error>> {
// 注意:需要实际的证书文件
let tls = ClientTlsConfig::new()
.ca_certificate(tonic::transport::Certificate::from_pem(include_bytes!("ca.pem")))
.domain_name("localhost");
let channel = Channel::from_static("https://localhost:50051")
.tls_config(tls)?
.connect()
.await?;
let client = Client::new(channel);
// 使用安全客户端进行通信
let request = Request::new(HelloRequest {
name: "Secure World".to_string(),
});
let response = client.hello(request).await?;
println!("安全连接响应: {}", response.into_inner().message);
Ok(())
}