Rust区块链库yellowstone-grpc-proto的使用:高性能GRPC协议实现与Solana节点交互

Metadata

包标识:cargo/yellowstone-grpc-proto@9.0.0

20天前更新

2021版本

Apache-2.0许可证

565 KiB大小

安装

在项目目录中运行以下Cargo命令:

cargo add yellowstone-grpc-proto

或在Cargo.toml中添加以下行:

yellowstone-grpc-proto = “9.0.0”

主页 triton.one

文档 docs.rs/yellowstone-grpc-proto/9.0.0

代码库 github.com/rpcpool/yellowstone-grpc

维护者 Kyle Espinola Linus Kendall lvboudre Leafaar

完整示例代码:

// Cargo.toml
[dependencies]
yellowstone-grpc-proto = "9.0.0"
tokio = { version = "1.0", features = ["full"] }
tonic = "0.9"
prost = "0.12"

// main.rs
use tonic::transport::Channel;
use yellowstone_grpc_proto::prelude::{
    geyser_client::GeyserClient, SubscribeRequest, SubscribeUpdate,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 连接到Solana节点的gRPC端点
    let channel = Channel::from_static("http://localhost:10000")
        .connect()
        .await?;
    
    // 创建gRPC客户端
    let mut client = GeyserClient::new(channel);
    
    // 创建订阅请求
    let request = tonic::Request::new(SubscribeRequest {
        // 配置订阅参数
        accounts: vec![],
        slots: vec![],
        transactions: vec![],
        transactions_status: vec![],
        blocks: vec![],
        blocks_meta: vec![],
        entry: vec![],
        commitment: Some("confirmed".to_string()),
        accounts_data_slice: vec![],
    });
    
    // 发送订阅请求并处理响应流
    let mut stream = client.subscribe(request).await?.into_inner();
    
    // 处理接收到的更新
    while let Some(update) = stream.message().await? {
        match update {
            SubscribeUpdate::Account(account_update) => {
                println!("Received account update: {:?}", account_update);
            }
            SubscribeUpdate::Slot(slot_update) => {
                println!("Received slot update: {:?}", slot_update);
            }
            SubscribeUpdate::Transaction(transaction_update) => {
                println!("Received transaction update: {:?}", transaction_update);
            }
            SubscribeUpdate::Block(block_update) => {
                println!("Received block update: {:?}", block_update);
            }
            _ => {}
        }
    }
    
    Ok(())
}

1 回复

Rust区块链库yellowstone-grpc-proto使用指南

概述

yellowstone-grpc-proto是一个高性能的Rust库,专门用于通过GRPC协议与Solana区块链节点进行交互。该库提供了完整的协议缓冲区定义和客户端实现,支持Solana节点的实时数据订阅和交易处理。

主要特性

  • 完整的Solana GRPC协议支持
  • 高性能异步通信
  • 类型安全的Rust接口
  • 实时账户和交易订阅
  • 区块和交易数据获取

安装方法

在Cargo.toml中添加依赖:

[dependencies]
yellowstone-grpc-proto = "0.1"
tokio = { version = "1.0", features = ["full"] }
prost = "0.11"

基本使用方法

1. 建立连接

use yellowstone_grpc_proto::prelude::*;
use yellowstone_grpc_proto::geyser_grpc_client::GeyserGrpcClient;
use tonic::transport::Channel;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let channel = Channel::from_static("http://localhost:10000")
        .connect()
        .await?;
    
    let mut client = GeyserGrpcClient::new(channel);
    Ok(())
}

2. 订阅账户更新

async fn subscribe_account_updates(
    client: &mut GeyserGrpcClient<Channel>,
) -> Result<(), Box<dyn std::error::Error>> {
    let request = SubscribeRequest {
        accounts: vec!["9we6kjtbcZ2vy3GSLLsZTEhbAqXPTRvEyoxa8wxSqKp5".to_string()],
        accounts_data_slice: vec![],
        transactions: vec![],
        transactions_status: vec![],
        slots: vec![],
        slots_updates: vec![],
        blocks: vec![],
        blocks_meta: vec![],
        entry: vec![],
        commitment: Some(CommitmentLevel::Confirmed.into()),
    };

    let mut stream = client
        .subscribe(request)
        .await?
        .into_inner();

    while let Some(response) = stream.message().await? {
        println!("Received update: {:?}", response);
    }

    Ok(())
}

3. 获取区块信息

async fn get_block(
    client: &mut GeyserGrpcClient<Channel>,
    slot: u64,
) -> Result<(), Box<dyn std::error::Error>> {
    let request = GetBlockRequest {
        slot,
        encoding: Some(BlockEncoding::Json.into()),
        max_supported_transaction_version: Some(0),
        ..Default::default()
    };

    let response = client.get_block(request).await?;
    println!("Block data: {:?}", response.into_inner());
    
    Ok(())
}

高级用法示例

批量订阅多个账户

async fn subscribe_multiple_accounts(
    client: &mut GeyserGrpcClient<Channel>,
    account_pubkeys: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
    let request = SubscribeRequest {
        accounts: account_pubkeys,
        accounts_data_slice: vec![],
        transactions: vec![],
        transactions_status: vec![],
        slots: vec![],
        slots_updates: vec![],
        blocks: vec![],
        blocks_meta: vec![],
        entry: vec![],
        commitment: Some(CommitmentLevel::Finalized.into()),
    };

    let mut stream = client.subscribe(request).await?.into_inner();
    
    tokio::spawn(async move {
        while let Some(update) = stream.message().await.unwrap() {
            // 处理账户更新
            handle_account_update(update).await;
        }
    });

    Ok(())
}

async fn handle_account_update(update: SubscribeUpdate) {
    match update.update_oneof {
        Some(subscribe_update::UpdateOneof::Account(account_update)) => {
            println!("Account updated: {:?}", account_update);
        }
        _ => {}
    }
}

错误处理

async fn safe_subscribe(
    client: &mut GeyserGrpcClient<Channel>,
) -> Result<(), Box<dyn std::error::Error>> {
    match client.subscribe(SubscribeRequest::default()).await {
        Ok(response) => {
            let mut stream = response.into_inner();
            // 处理流数据
            Ok(())
        }
        Err(status) => {
            eprintln!("GRPC error: {}", status);
            Err(Box::new(status))
        }
    }
}

性能优化建议

  1. 使用连接池管理多个GRPC连接
  2. 合理设置订阅过滤器以减少不必要的数据传输
  3. 使用异步任务处理大量数据更新
  4. 实现适当的重连机制处理连接中断

注意事项

  • 需要运行支持GRPC的Solana节点
  • 确保网络连接稳定
  • 适当处理背压和流量控制
  • 注意内存使用,特别是在处理大量实时数据时

这个库为Rust开发者提供了与Solana区块链交互的高效方式,特别适合需要实时数据处理和高性能要求的区块链应用开发。

完整示例demo

// 完整示例:使用yellowstone-grpc-proto库进行Solana区块链交互
use yellowstone_grpc_proto::prelude::*;
use yellowstone_grpc_proto::geyser_grpc_client::GeyserGrpcClient;
use tonic::transport::Channel;
use std::time::Duration;
use tokio::sync::mpsc;

// 主函数
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 建立GRPC连接
    println!("正在连接到Solana节点...");
    let channel = Channel::from_static("http://localhost:10000")
        .connect_timeout(Duration::from_secs(10))
        .connect()
        .await?;
    
    let mut client = GeyserGrpcClient::new(channel);
    println!("连接成功!");

    // 示例1:订阅特定账户更新
    println!("开始订阅账户更新...");
    let account_pubkeys = vec![
        "9we6kjtbcZ2vy3GSLLsZTEhbAqXPTRvEyoxa8wxSqKp5".to_string(),
        "另一个账户公钥".to_string(), // 替换为实际的账户公钥
    ];

    // 创建通道用于处理更新
    let (tx, mut rx) = mpsc::channel(100);

    // 启动订阅任务
    let subscribe_handle = tokio::spawn(async move {
        if let Err(e) = subscribe_multiple_accounts(&mut client, account_pubkeys, tx).await {
            eprintln!("订阅失败: {}", e);
        }
    });

    // 处理接收到的更新
    while let Some(update) = rx.recv().await {
        handle_account_update(update).await;
    }

    // 等待订阅任务完成
    let _ = subscribe_handle.await;

    Ok(())
}

// 订阅多个账户更新
async fn subscribe_multiple_accounts(
    client: &mut GeyserGrpcClient<Channel>,
    account_pubkeys: Vec<String>,
    tx: mpsc::Sender<SubscribeUpdate>,
) -> Result<(), Box<dyn std::error::Error>> {
    let request = SubscribeRequest {
        accounts: account_pubkeys,
        accounts_data_slice: vec![],
        transactions: vec![],
        transactions_status: vec![],
        slots: vec![],
        slots_updates: vec![],
        blocks: vec![],
        blocks_meta: vec![],
        entry: vec![],
        commitment: Some(CommitmentLevel::Finalized.into()),
    };

    let mut stream = client.subscribe(request).await?.into_inner();
    
    // 持续监听流数据
    while let Some(update) = stream.message().await? {
        // 发送更新到处理通道
        if let Err(e) = tx.send(update).await {
            eprintln!("发送更新失败: {}", e);
            break;
        }
    }

    Ok(())
}

// 处理账户更新
async fn handle_account_update(update: SubscribeUpdate) {
    match update.update_oneof {
        Some(subscribe_update::UpdateOneof::Account(account_update)) => {
            println!("账户更新接收时间: {:?}", std::time::SystemTime::now());
            println!("账户公钥: {:?}", account_update.pubkey);
            println!("账户数据长度: {} bytes", account_update.data.len());
            println!("slot: {}", account_update.slot);
            println!("---");
        }
        Some(subscribe_update::UpdateOneof::Slot(slot_update)) => {
            println!("Slot更新: {:?}", slot_update);
        }
        Some(subscribe_update::UpdateOneof::Transaction(transaction_update)) => {
            println!("交易更新: {:?}", transaction_update);
        }
        _ => {
            println!("收到未知类型的更新");
        }
    }
}

// 错误处理包装函数
async fn safe_grpc_call<F, T>(call: F) -> Result<T, Box<dyn std::error::Error>>
where
    F: std::future::Future<Output = Result<T, tonic::Status>>,
{
    match call.await {
        Ok(result) => Ok(result),
        Err(status) => {
            eprintln!("GRPC调用错误: {}", status);
            Err(Box::new(status))
        }
    }
}

// 获取区块信息的示例函数
async fn get_block_example(
    client: &mut GeyserGrpcClient<Channel>,
    slot: u64,
) -> Result<(), Box<dyn std::error::Error>> {
    let request = GetBlockRequest {
        slot,
        encoding: Some(BlockEncoding::Json.into()),
        max_supported_transaction_version: Some(0),
        ..Default::default()
    };

    let response = safe_grpc_call(client.get_block(request)).await?;
    let block_data = response.into_inner();
    
    println!("区块 {} 的数据:", slot);
    println!("区块哈希: {:?}", block_data.blockhash);
    println!("父区块哈希: {:?}", block_data.previous_blockhash);
    println!("包含的交易数量: {}", block_data.transactions.len());
    
    Ok(())
}

// 重连机制示例
async fn create_client_with_retry(
    endpoint: &'static str,
    max_retries: u32,
) -> Result<GeyserGrpcClient<Channel>, Box<dyn std::error::Error>> {
    let mut retries = 0;
    
    loop {
        match Channel::from_static(endpoint)
            .connect_timeout(Duration::from_secs(5))
            .connect()
            .await
        {
            Ok(channel) => {
                return Ok(GeyserGrpcClient::new(channel));
            }
            Err(e) => {
                retries += 1;
                if retries >= max_retries {
                    return Err(Box::new(e));
                }
                println!("连接失败,第{}次重试...", retries);
                tokio::time::sleep(Duration::from_secs(2)).await;
            }
        }
    }
}
回到顶部