Rust异步工具库tor-async-utils的使用,为Tor网络提供高效异步操作支持

Rust异步工具库tor-async-utils的使用,为Tor网络提供高效异步操作支持

tor-async-utils介绍

tor-async-utils是一个为Tor项目提供底层异步工具支持的Rust库。它主要服务于tor-*arti-*系列的Tor实现相关crate。

这个crate位于Tor crate栈的底层位置。如果其中任何功能被认为具有普遍用途,应该考虑将其移出到独立的crate中。

许可证:MIT OR Apache-2.0

安装方式

在项目目录中运行以下Cargo命令:

cargo add tor-async-utils

或者在Cargo.toml中添加:

tor-async-utils = "0.32.0"

示例代码

以下是一个使用tor-async-utils进行异步操作的完整示例:

use futures::future::{ready, FutureExt};
use tor_async_utils::oneshot;

#[tokio::main]
async fn main() {
    // 创建oneshot通道
    let (sender, receiver) = oneshot::channel();
    
    // 在另一个任务中发送消息
    tokio::spawn(async move {
        sender.send("Hello from Tor network").unwrap();
    });
    
    // 接收消息
    match receiver.await {
        Ok(msg) => println!("Received: {}", msg),
        Err(_) => println!("Sender dropped before sending"),
    }
    
    // 使用PostageWatchExt扩展
    let (watch_sender, watch_receiver) = postage::watch::channel();
    
    tokio::spawn(async move {
        watch_sender.broadcast(42).await;
    });
    
    // 使用watch_receiver作为Stream
    let value = watch_receiver
        .into_stream()
        .filter_map(|x| ready(x))
        .next()
        .await;
        
    println!("Watched value: {:?}", value);
}

这个示例展示了:

  1. 使用tor-async-utils提供的oneshot通道进行简单消息传递
  2. 使用PostageWatchExt扩展将watch接收器转换为Stream
  3. 结合Tokio运行时进行异步操作

完整示例demo

以下是基于tor-async-utils的完整示例,展示了更复杂的异步操作场景:

use futures::stream::StreamExt;
use tor_async_utils::{oneshot, postage};
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    // 示例1: 使用oneshot进行任务协调
    let (task_sender, task_receiver) = oneshot::channel();
    
    let worker = tokio::spawn(async move {
        println!("Worker started, waiting for task...");
        match task_receiver.await {
            Ok(task) => println!("Worker received task: {}", task),
            Err(_) => println!("Task was cancelled before sending"),
        }
    });
    
    sleep(Duration::from_secs(1)).await;
    task_sender.send("Process network data").unwrap();
    worker.await.unwrap();
    
    // 示例2: 使用watch通道监控配置变化
    let (config_sender, config_receiver) = postage::watch::channel::<String>("initial_config".to_string());
    
    tokio::spawn(async move {
        for i in 1..=3 {
            sleep(Duration::from_secs(2)).await;
            let new_config = format!("config_v{}", i);
            println!("Updating config to: {}", new_config);
            config_sender.broadcast(new_config).await;
        }
    });
    
    // 监听配置变化
    config_receiver
        .into_stream()
        .take(3) // 只接收3次更新
        .for_each(|config| {
            println!("Config changed to: {:?}", config);
            futures::future::ready(())
        })
        .await;
    
    println!("All examples completed");
}

主要功能

  • 提供Tor项目所需的异步原语和工具
  • 包含oneshot通道实现
  • 提供watch类型的Stream转换支持
  • 为Tor网络操作提供高效的异步支持

项目维护

该库由Tor项目核心团队维护,主要贡献者包括:

  • Gabi Moldovan
  • Ian Jackson
  • Nick Mathewson
  • David Goulet

1 回复

Rust异步工具库tor-async-utils的使用 - 为Tor网络提供高效异步操作支持

简介

tor-async-utils是一个专为Tor网络设计的Rust异步工具库,它提供了一系列实用工具和抽象,帮助开发者更高效地构建与Tor网络交互的异步应用程序。该库建立在Tokio异步运行时之上,针对Tor网络的特殊需求进行了优化。

主要功能

  1. 提供连接Tor网络的异步工具
  2. 实现高效的异步流处理
  3. 提供超时和重试机制
  4. 支持异步任务管理

安装

在Cargo.toml中添加依赖:

[dependencies]
tor-async-utils = "0.4"
tokio = { version = "1.0", features = ["full"] }

使用示例

基本用法

use tor_async_utils::oneshot;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 创建一个oneshot通道
    let (sender, receiver) = oneshot::channel();
    
    tokio::spawn(async move {
        sleep(Duration::from_secs(1)).await;
        sender.send("Hello from Tor!").unwrap();
    });
    
    match receiver.await {
        Ok(msg) => println!("Received: {}", msg),
        Err(e) => eprintln!("Error receiving message: {}", e),
    }
}

带超时的Tor请求

use tor_async_utils::timeout::timeout;
use std::time::Duration;

async fn fetch_from_tor() -> Result<String, &'static str> {
    // 模拟Tor网络请求
    tokio::time::sleep(Duration::from_secs(2)).await;
    Ok("Tor response data".to_string())
}

#[tokio::main]
async fn main() {
    match timeout(Duration::from_secs(1), fetch_from_tor()).await {
        Ok(Ok(data)) => println!("Got data: {}", data),
        Ok(Err(e)) => eprintln!("Request failed: {}", e),
        Err(_) => eprintln!("Request timed out"),
    }
}

异步流处理

use tor_async_utils::stream::StreamExt;
use futures::stream;

#[tokio::main]
async fn main() {
    let numbers = stream::iter(1..=10);
    
    let processed = numbers
        .map(|n| async move { n * 2 })
        .buffered(3);  // 并发处理3个item
    
    let results: Vec<_> = processed.collect().await;
    println!("Processed results: {:?}", results);
}

高级特性

重试机制

use tor_async_utils::retry::{retry, RetryDelay};
use std::time::Duration;

async fn unreliable_tor_operation(attempt: u32) -> Result<(), &'static str> {
    if attempt < 3 {
        Err("Temporary failure")
    } else {
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let retry_policy = RetryDelay::exponential(
        Duration::from_secs(1), 
        Duration::from_secs(30),
        2.0
    );
    
    match retry(retry_policy, unreliable_tor_operation).await {
        Ok(_) => println!("Operation succeeded"),
        Err(e) => eprintln!("Failed after retries: {}", e),
    }
}

任务管理

use tor_async_utils::task::TaskManager;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let task_manager = TaskManager::new();
    
    // 添加任务
    task_manager.spawn("background-task", async {
        tokio::time::sleep(Duration::from_secs(2)).await;
        println!("Background task completed");
    });
    
    // 等待所有任务完成
    task_manager.join_all().await;
    println!("All tasks finished");
}

完整示例

下面是一个结合多个特性的完整示例,展示了如何使用tor-async-utils进行Tor网络请求:

use tor_async_utils::{
    oneshot,
    timeout::timeout,
    retry::{retry, RetryDelay},
    task::TaskManager,
    stream::StreamExt
};
use futures::stream;
use std::time::Duration;
use tokio::time::{sleep};

// 模拟Tor网络请求
async fn tor_request(url: &str, attempt: u32) -> Result<String, &'static str> {
    println!("Attempt {}: Requesting {}", attempt, url);
    
    // 模拟网络延迟
    sleep(Duration::from_millis(500)).await;
    
    // 模拟前两次失败,第三次成功
    if attempt < 3 {
        Err("Temporary network failure")
    } else {
        Ok(format!("Response from {}", url))
    }
}

#[tokio::main]
async fn main() {
    // 创建任务管理器
    let task_manager = TaskManager::new();
    
    // 添加第一个任务 - 处理多个URL
    task_manager.spawn("url-processor", async {
        let urls = vec!["tor://example1.onion", "tor://example2.onion"];
        
        // 使用流处理并发请求
        let stream = stream::iter(urls)
            .map(|url| async move {
                // 设置重试策略
                let retry_policy = RetryDelay::exponential(
                    Duration::from_secs(1),
                    Duration::from_secs(5),
                    2.0
                );
                
                // 带重试的请求
                match retry(retry_policy, |attempt| tor_request(url, attempt)).await {
                    Ok(response) => println!("Success: {}", response),
                    Err(e) => eprintln!("Failed after retries: {}", e),
                }
            })
            .buffered(2); // 并发2个请求
        
        stream.collect::<Vec<_>>().await;
    });
    
    // 添加第二个任务 - 带超时的oneshot通信
    task_manager.spawn("oneshot-demo", async {
        let (sender, receiver) = oneshot::channel();
        
        tokio::spawn(async move {
            sleep(Duration::from_secs(1)).await;
            sender.send("Message from Tor").unwrap();
        });
        
        // 设置500ms超时
        match timeout(Duration::from_millis(500), receiver).await {
            Ok(Ok(msg)) => println!("Received: {}", msg),
            Ok(Err(e)) => eprintln!("Channel error: {}", e),
            Err(_) => eprintln!("Timeout waiting for message"),
        }
    });
    
    // 等待所有任务完成
    task_manager.join_all().await;
    println!("All tasks completed");
}

最佳实践

  1. 为Tor网络操作设置合理的超时时间
  2. 使用提供的重试机制处理网络不稳定性
  3. 合理控制并发量以避免被Tor节点限制
  4. 使用任务管理器管理长期运行的异步任务

注意事项

  • 该库主要针对Tor网络特性优化,普通网络应用可能不需要这些工具
  • 使用时需要熟悉Tokio异步运行时
  • 生产环境中应仔细调整超时和重试参数

通过tor-async-utils,开发者可以更轻松地构建稳定高效的Tor网络应用,专注于业务逻辑而不是底层异步处理细节。

回到顶部