Rust区块链库yellowstone-grpc-proto的使用:高性能GRPC协议实现与Solana节点交互
Metadata
包标识:cargo/yellowstone-grpc-proto@9.0.0
20天前更新
2021版本
Apache-2.0许可证
565 KiB大小
安装
在项目目录中运行以下Cargo命令:
cargo add yellowstone-grpc-proto
或在Cargo.toml中添加以下行:
yellowstone-grpc-proto = “9.0.0”
主页 triton.one
文档 docs.rs/yellowstone-grpc-proto/9.0.0
代码库 github.com/rpcpool/yellowstone-grpc
维护者 Kyle Espinola Linus Kendall lvboudre Leafaar
完整示例代码:
// Cargo.toml
[dependencies]
yellowstone-grpc-proto = "9.0.0"
tokio = { version = "1.0", features = ["full"] }
tonic = "0.9"
prost = "0.12"
// main.rs
use tonic::transport::Channel;
use yellowstone_grpc_proto::prelude::{
geyser_client::GeyserClient, SubscribeRequest, SubscribeUpdate,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 连接到Solana节点的gRPC端点
let channel = Channel::from_static("http://localhost:10000")
.connect()
.await?;
// 创建gRPC客户端
let mut client = GeyserClient::new(channel);
// 创建订阅请求
let request = tonic::Request::new(SubscribeRequest {
// 配置订阅参数
accounts: vec![],
slots: vec![],
transactions: vec![],
transactions_status: vec![],
blocks: vec![],
blocks_meta: vec![],
entry: vec![],
commitment: Some("confirmed".to_string()),
accounts_data_slice: vec![],
});
// 发送订阅请求并处理响应流
let mut stream = client.subscribe(request).await?.into_inner();
// 处理接收到的更新
while let Some(update) = stream.message().await? {
match update {
SubscribeUpdate::Account(account_update) => {
println!("Received account update: {:?}", account_update);
}
SubscribeUpdate::Slot(slot_update) => {
println!("Received slot update: {:?}", slot_update);
}
SubscribeUpdate::Transaction(transaction_update) => {
println!("Received transaction update: {:?}", transaction_update);
}
SubscribeUpdate::Block(block_update) => {
println!("Received block update: {:?}", block_update);
}
_ => {}
}
}
Ok(())
}
1 回复
Rust区块链库yellowstone-grpc-proto使用指南
概述
yellowstone-grpc-proto是一个高性能的Rust库,专门用于通过GRPC协议与Solana区块链节点进行交互。该库提供了完整的协议缓冲区定义和客户端实现,支持Solana节点的实时数据订阅和交易处理。
主要特性
- 完整的Solana GRPC协议支持
- 高性能异步通信
- 类型安全的Rust接口
- 实时账户和交易订阅
- 区块和交易数据获取
安装方法
在Cargo.toml中添加依赖:
[dependencies]
yellowstone-grpc-proto = "0.1"
tokio = { version = "1.0", features = ["full"] }
prost = "0.11"
基本使用方法
1. 建立连接
use yellowstone_grpc_proto::prelude::*;
use yellowstone_grpc_proto::geyser_grpc_client::GeyserGrpcClient;
use tonic::transport::Channel;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let channel = Channel::from_static("http://localhost:10000")
.connect()
.await?;
let mut client = GeyserGrpcClient::new(channel);
Ok(())
}
2. 订阅账户更新
async fn subscribe_account_updates(
client: &mut GeyserGrpcClient<Channel>,
) -> Result<(), Box<dyn std::error::Error>> {
let request = SubscribeRequest {
accounts: vec!["9we6kjtbcZ2vy3GSLLsZTEhbAqXPTRvEyoxa8wxSqKp5".to_string()],
accounts_data_slice: vec![],
transactions: vec![],
transactions_status: vec![],
slots: vec![],
slots_updates: vec![],
blocks: vec![],
blocks_meta: vec![],
entry: vec![],
commitment: Some(CommitmentLevel::Confirmed.into()),
};
let mut stream = client
.subscribe(request)
.await?
.into_inner();
while let Some(response) = stream.message().await? {
println!("Received update: {:?}", response);
}
Ok(())
}
3. 获取区块信息
async fn get_block(
client: &mut GeyserGrpcClient<Channel>,
slot: u64,
) -> Result<(), Box<dyn std::error::Error>> {
let request = GetBlockRequest {
slot,
encoding: Some(BlockEncoding::Json.into()),
max_supported_transaction_version: Some(0),
..Default::default()
};
let response = client.get_block(request).await?;
println!("Block data: {:?}", response.into_inner());
Ok(())
}
高级用法示例
批量订阅多个账户
async fn subscribe_multiple_accounts(
client: &mut GeyserGrpcClient<Channel>,
account_pubkeys: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let request = SubscribeRequest {
accounts: account_pubkeys,
accounts_data_slice: vec![],
transactions: vec![],
transactions_status: vec![],
slots: vec![],
slots_updates: vec![],
blocks: vec![],
blocks_meta: vec![],
entry: vec![],
commitment: Some(CommitmentLevel::Finalized.into()),
};
let mut stream = client.subscribe(request).await?.into_inner();
tokio::spawn(async move {
while let Some(update) = stream.message().await.unwrap() {
// 处理账户更新
handle_account_update(update).await;
}
});
Ok(())
}
async fn handle_account_update(update: SubscribeUpdate) {
match update.update_oneof {
Some(subscribe_update::UpdateOneof::Account(account_update)) => {
println!("Account updated: {:?}", account_update);
}
_ => {}
}
}
错误处理
async fn safe_subscribe(
client: &mut GeyserGrpcClient<Channel>,
) -> Result<(), Box<dyn std::error::Error>> {
match client.subscribe(SubscribeRequest::default()).await {
Ok(response) => {
let mut stream = response.into_inner();
// 处理流数据
Ok(())
}
Err(status) => {
eprintln!("GRPC error: {}", status);
Err(Box::new(status))
}
}
}
性能优化建议
- 使用连接池管理多个GRPC连接
- 合理设置订阅过滤器以减少不必要的数据传输
- 使用异步任务处理大量数据更新
- 实现适当的重连机制处理连接中断
注意事项
- 需要运行支持GRPC的Solana节点
- 确保网络连接稳定
- 适当处理背压和流量控制
- 注意内存使用,特别是在处理大量实时数据时
这个库为Rust开发者提供了与Solana区块链交互的高效方式,特别适合需要实时数据处理和高性能要求的区块链应用开发。
完整示例demo
// 完整示例:使用yellowstone-grpc-proto库进行Solana区块链交互
use yellowstone_grpc_proto::prelude::*;
use yellowstone_grpc_proto::geyser_grpc_client::GeyserGrpcClient;
use tonic::transport::Channel;
use std::time::Duration;
use tokio::sync::mpsc;
// 主函数
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 建立GRPC连接
println!("正在连接到Solana节点...");
let channel = Channel::from_static("http://localhost:10000")
.connect_timeout(Duration::from_secs(10))
.connect()
.await?;
let mut client = GeyserGrpcClient::new(channel);
println!("连接成功!");
// 示例1:订阅特定账户更新
println!("开始订阅账户更新...");
let account_pubkeys = vec![
"9we6kjtbcZ2vy3GSLLsZTEhbAqXPTRvEyoxa8wxSqKp5".to_string(),
"另一个账户公钥".to_string(), // 替换为实际的账户公钥
];
// 创建通道用于处理更新
let (tx, mut rx) = mpsc::channel(100);
// 启动订阅任务
let subscribe_handle = tokio::spawn(async move {
if let Err(e) = subscribe_multiple_accounts(&mut client, account_pubkeys, tx).await {
eprintln!("订阅失败: {}", e);
}
});
// 处理接收到的更新
while let Some(update) = rx.recv().await {
handle_account_update(update).await;
}
// 等待订阅任务完成
let _ = subscribe_handle.await;
Ok(())
}
// 订阅多个账户更新
async fn subscribe_multiple_accounts(
client: &mut GeyserGrpcClient<Channel>,
account_pubkeys: Vec<String>,
tx: mpsc::Sender<SubscribeUpdate>,
) -> Result<(), Box<dyn std::error::Error>> {
let request = SubscribeRequest {
accounts: account_pubkeys,
accounts_data_slice: vec![],
transactions: vec![],
transactions_status: vec![],
slots: vec![],
slots_updates: vec![],
blocks: vec![],
blocks_meta: vec![],
entry: vec![],
commitment: Some(CommitmentLevel::Finalized.into()),
};
let mut stream = client.subscribe(request).await?.into_inner();
// 持续监听流数据
while let Some(update) = stream.message().await? {
// 发送更新到处理通道
if let Err(e) = tx.send(update).await {
eprintln!("发送更新失败: {}", e);
break;
}
}
Ok(())
}
// 处理账户更新
async fn handle_account_update(update: SubscribeUpdate) {
match update.update_oneof {
Some(subscribe_update::UpdateOneof::Account(account_update)) => {
println!("账户更新接收时间: {:?}", std::time::SystemTime::now());
println!("账户公钥: {:?}", account_update.pubkey);
println!("账户数据长度: {} bytes", account_update.data.len());
println!("slot: {}", account_update.slot);
println!("---");
}
Some(subscribe_update::UpdateOneof::Slot(slot_update)) => {
println!("Slot更新: {:?}", slot_update);
}
Some(subscribe_update::UpdateOneof::Transaction(transaction_update)) => {
println!("交易更新: {:?}", transaction_update);
}
_ => {
println!("收到未知类型的更新");
}
}
}
// 错误处理包装函数
async fn safe_grpc_call<F, T>(call: F) -> Result<T, Box<dyn std::error::Error>>
where
F: std::future::Future<Output = Result<T, tonic::Status>>,
{
match call.await {
Ok(result) => Ok(result),
Err(status) => {
eprintln!("GRPC调用错误: {}", status);
Err(Box::new(status))
}
}
}
// 获取区块信息的示例函数
async fn get_block_example(
client: &mut GeyserGrpcClient<Channel>,
slot: u64,
) -> Result<(), Box<dyn std::error::Error>> {
let request = GetBlockRequest {
slot,
encoding: Some(BlockEncoding::Json.into()),
max_supported_transaction_version: Some(0),
..Default::default()
};
let response = safe_grpc_call(client.get_block(request)).await?;
let block_data = response.into_inner();
println!("区块 {} 的数据:", slot);
println!("区块哈希: {:?}", block_data.blockhash);
println!("父区块哈希: {:?}", block_data.previous_blockhash);
println!("包含的交易数量: {}", block_data.transactions.len());
Ok(())
}
// 重连机制示例
async fn create_client_with_retry(
endpoint: &'static str,
max_retries: u32,
) -> Result<GeyserGrpcClient<Channel>, Box<dyn std::error::Error>> {
let mut retries = 0;
loop {
match Channel::from_static(endpoint)
.connect_timeout(Duration::from_secs(5))
.connect()
.await
{
Ok(channel) => {
return Ok(GeyserGrpcClient::new(channel));
}
Err(e) => {
retries += 1;
if retries >= max_retries {
return Err(Box::new(e));
}
println!("连接失败,第{}次重试...", retries);
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
}