Rust NOSTR协议中继池库nostr-relay-pool的使用:实现高效分布式消息中继与网络连接管理

Rust NOSTR协议中继池库nostr-relay-pool的使用:实现高效分布式消息中继与网络连接管理

功能标志

以下是可用的crate功能标志:

功能 默认 描述
tor 启用嵌入式tor客户端支持

状态

该库处于ALPHA状态,已实现的功能通常可以工作,但API将会以不兼容的方式发生变化。

捐赠

rust-nostr是免费和开源的。这意味着我们不会通过出售它来获得任何收入。相反,我们依赖于您的财务支持。如果您积极使用任何rust-nostr库/软件/服务,请考虑捐赠。

许可证

该项目采用MIT软件许可证分发 - 详情请参阅LICENSE文件

安装

在项目目录中运行以下Cargo命令:

cargo add nostr-relay-pool

或者将以下行添加到您的Cargo.toml中:

nostr-relay-pool = "0.43.0"

示例代码

以下是使用nostr-relay-pool库的完整示例:

use nostr_relay_pool::RelayPool;
use nostr::prelude::*;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<()> {
    // 创建密钥对
    let keys = Keys::generate();
    
    // 初始化中继池
    let relay_pool = RelayPool::new();
    
    // 添加中继URL
    relay_pool.add_relay("wss://relay.damus.io", None).await?;
    relay_pool.add_relay("wss://nostr.wine", None).await?;
    
    // 连接到所有中继
    relay_pool.connect().await;
    
    // 创建事件
    let event = EventBuilder::new_text_note("Hello from nostr-relay-pool!", &[])
        .to_event(&keys)?;
    
    // 发送事件到所有中继
    relay_pool.send_event(event).await?;
    
    // 订阅事件
    let subscription = Filter::new()
        .kind(Kind::TextNote)
        .limit(10);
    
    relay_pool.subscribe("my-subscription", vec![subscription]).await;
    
    // 处理接收到的消息
    while let Ok(notification) = relay_pool.notifications().recv().await {
        match notification {
            RelayPoolNotification::Event(_sub_id, event) => {
                println!("Received event: {:?}", event);
            },
            RelayPoolNotification::Message(_relay_url, message) => {
                println!("Received message: {:?}", message);
            },
            _ => {}
        }
    }
    
    // 等待一段时间
    sleep(Duration::from_secs(30)).await;
    
    // 断开所有中继连接
    relay_pool.disconnect().await?;
    
    Ok(())
}

这个示例展示了:

  1. 创建密钥对
  2. 初始化中继池
  3. 添加多个中继服务器
  4. 连接到所有中继
  5. 创建并发送事件
  6. 订阅和接收事件
  7. 处理不同类型的通知
  8. 断开连接

完整示例代码

以下是基于上述内容的完整示例demo:

// 引入必要的库
use nostr_relay_pool::RelayPool;
use nostr::prelude::*;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<()> {
    // 1. 创建密钥对 - 用于签名事件
    let keys = Keys::generate();
    println!("生成公钥: {}", keys.public_key());
    
    // 2. 初始化中继池
    let relay_pool = RelayPool::new();
    
    // 3. 添加多个中继服务器
    let relays = vec![
        "wss://relay.damus.io",
        "wss://nostr.wine",
        "wss://nos.lol"
    ];
    
    for relay in relays {
        match relay_pool.add_relay(relay, None).await {
            Ok(_) => println!("成功添加中继: {}", relay),
            Err(e) => eprintln!("添加中继失败 ({}): {}", relay, e),
        }
    }
    
    // 4. 连接到所有中继
    println!("连接中继中...");
    relay_pool.connect().await;
    
    // 5. 创建并发送文本事件
    println!("准备发送事件...");
    let event = EventBuilder::new_text_note("Hello from nostr-relay-pool demo!", &[])
        .to_event(&keys)?;
    
    match relay_pool.send_event(event).await {
        Ok(_) => println!("事件发送成功"),
        Err(e) => eprintln!("事件发送失败: {}", e),
    }
    
    // 6. 订阅文本事件
    println!("订阅文本事件...");
    let subscription = Filter::new()
        .kind(Kind::TextNote)
        .limit(10);
    
    relay_pool.subscribe("text-note-subscription", vec![subscription]).await;
    
    // 7. 处理接收到的消息
    println!("开始监听事件...");
    tokio::spawn(async move {
        while let Ok(notification) = relay_pool.notifications().recv().await {
            match notification {
                RelayPoolNotification::Event(sub_id, event) => {
                    println!("[{}] 收到事件 ({}): {}", 
                        sub_id, 
                        event.kind, 
                        event.content
                    );
                },
                RelayPoolNotification::Message(relay_url, message) => {
                    println!("来自 {} 的消息: {:?}", relay_url, message);
                },
                RelayPoolNotification::RelayStatus(url, status) => {
                    println!("中继状态变化: {} -> {:?}", url, status);
                },
                _ => {}
            }
        }
    });
    
    // 8. 等待一段时间让程序运行
    println!("运行30秒...");
    sleep(Duration::from_secs(30)).await;
    
    // 9. 断开所有中继连接
    println!("断开中继连接...");
    relay_pool.disconnect().await?;
    
    println!("程序结束");
    Ok(())
}

这个完整示例扩展了原始示例,增加了:

  1. 更多错误处理和日志输出
  2. 添加了多个中继服务器的循环处理
  3. 增加了对中继状态变化的监听
  4. 使用tokio::spawn来异步处理通知
  5. 更详细的运行状态输出

注意:该库仍处于ALPHA阶段,API可能会发生变化。使用时请参考最新文档。


1 回复

Rust NOSTR协议中继池库nostr-relay-pool的使用指南

介绍

nostr-relay-pool是一个用于NOSTR协议的Rust库,它提供了高效管理多个中继服务器连接的能力。NOSTR(Notes and Other Stuff Transmitted by Relays)是一个简单的开放协议,用于去中心化社交网络,而中继服务器是该协议的核心组件。

这个库的主要功能包括:

  • 管理多个中继连接池
  • 自动重连机制
  • 消息订阅和发布
  • 高效的事件过滤

安装

在Cargo.toml中添加依赖:

[dependencies]
nostr-relay-pool = "0.5"  # 请使用最新版本
tokio = { version = "1.0", features = ["full"] }

基本使用方法

1. 创建中继池

use nostr_relay_pool::{RelayPool, RelayPoolNotification};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 创建中继URL列表
    let relay_urls = vec![
        "wss://relay.damus.io".to_string(),
        "wss://nostr-pub.wellorder.net".to_string(),
        "wss://relay.snort.social".to_string(),
    ];
    
    // 创建中继池
    let (notification_tx, mut notification_rx) = mpsc::channel(100);
    let pool = RelayPool::new(notification_tx);
    
    // 添加中继
    for url in relay_urls {
        pool.add_relay(&url, None).await.unwrap();
    }
    
    // 启动中继池
    pool.connect().await;
}

2. 订阅事件

use nostr::Filter;

// 创建过滤器 - 订阅特定作者的最新笔记
let filter = Filter::new()
    .authors(vec![
        "32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245".parse().unwrap()
    ])
    .limit(10);

// 订阅
let sub_id = "my-subscription-id".to_string();
pool.subscribe(vec![filter], Some(sub_id.clone()).await;

3. 处理通知

// 在一个单独的task中处理通知
tokio::spawn(async move {
    while let Some(notification) = notification_rx.recv().await {
        match notification {
            RelayPoolNotification::Event(sub_id, event) => {
                println!("收到事件 {}: {}", sub_id, event.content);
            }
            RelayPoolNotification::Message(message) => {
                println!("中继消息: {:?}", message);
            }
            RelayPoolNotification::RelayStatus(url, status) => {
                println!("中继 {} 状态变化: {:?}", url, status);
            }
            _ => {}
        }
    }
});

4. 发布事件

use nostr::{Event, Keys};

// 创建密钥
let keys = Keys::generate();

// 创建事件
let event = Event::new_text_note("Hello from Rust nostr-relay-pool!", &keys).unwrap();

// 发布到所有中继
pool.publish_event(event).await.unwrap();

高级功能

1. 自定义连接选项

use nostr_relay_pool::RelayOptions;

let options = RelayOptions::new()
    .read(true)    // 只读
    .write(true)   // 可写
    .retry_secs(5) // 重试间隔5秒
    .timeout_secs(10); // 超时10秒

pool.add_relay("wss://custom.relay", Some(options)).await.unwrap();

2. 批量操作

// 批量添加中继
pool.add_relays(vec![
    ("wss://relay1.com".to_string(), None),
    ("wss://relay2.com".to_string(), None),
]).await.unwrap();

// 批量取消订阅
pool.unsubscribe(vec!["sub-id-1".to_string(), "sub-id-2".to_string()]).await;

3. 连接状态监控

// 获取所有中继状态
let statuses = pool.relay_statuses().await;
for (url, status) in statuses {
    println!("{}: {:?}", url, status);
}

// 断开特定中继
pool.disconnect_relay("wss://relay.damus.io").await.unwrap();

最佳实践

  1. 错误处理:始终处理可能出现的错误,特别是网络操作
  2. 资源清理:不再需要时取消订阅并断开连接
  3. 连接管理:监控中继状态并处理重连逻辑
  4. 性能考虑:合理设置过滤器以避免过多不必要的事件

完整示例

use nostr_relay_pool::{RelayPool, RelayPoolNotification};
use nostr::{Filter, Keys, Event};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 初始化
    let (notification_tx, mut notification_rx) = mpsc::channel(100);
    let pool = RelayPool::new(notification_tx);
    
    // 添加中继
    pool.add_relays(vec![
        ("wss://relay.damus.io".to_string(), None),
        ("wss://nostr-pub.wellorder.net".to_string(), None),
    ]).await?;
    
    // 连接
    pool.connect().await;
    
    // 订阅
    let filter = Filter::new().limit(10);
    let sub_id = "example-sub".to_string();
    pool.subscribe(vec![filter], Some(sub_id.clone())).await;
    
    // 处理通知
    tokio::spawn(async move {
        while let Some(notification) = notification_rx.recv().await {
            match notification {
                RelayPoolNotification::Event(sub_id, event) => {
                    println!("[{}] 新事件: {}", sub_id, event.content);
                }
                RelayPoolNotification::RelayStatus(url, status) => {
                    println!("中继状态更新: {} -> {:?}", url, status);
                }
                _ => {}
            }
        }
    });
    
    // 发布事件
    let keys = Keys::generate();
    let event = Event::new_text_note("测试消息", &keys)?;
    pool.publish_event(event).await?;
    
    // 保持运行
    tokio::signal::ctrl_c().await?;
    Ok(())
}
回到顶部