Rust异步工具库tor-async-utils的使用,为Tor网络提供高效异步操作支持
Rust异步工具库tor-async-utils的使用,为Tor网络提供高效异步操作支持
tor-async-utils介绍
tor-async-utils是一个为Tor项目提供底层异步工具支持的Rust库。它主要服务于tor-*
和arti-*
系列的Tor实现相关crate。
这个crate位于Tor crate栈的底层位置。如果其中任何功能被认为具有普遍用途,应该考虑将其移出到独立的crate中。
许可证:MIT OR Apache-2.0
安装方式
在项目目录中运行以下Cargo命令:
cargo add tor-async-utils
或者在Cargo.toml中添加:
tor-async-utils = "0.32.0"
示例代码
以下是一个使用tor-async-utils进行异步操作的完整示例:
use futures::future::{ready, FutureExt};
use tor_async_utils::oneshot;
#[tokio::main]
async fn main() {
// 创建oneshot通道
let (sender, receiver) = oneshot::channel();
// 在另一个任务中发送消息
tokio::spawn(async move {
sender.send("Hello from Tor network").unwrap();
});
// 接收消息
match receiver.await {
Ok(msg) => println!("Received: {}", msg),
Err(_) => println!("Sender dropped before sending"),
}
// 使用PostageWatchExt扩展
let (watch_sender, watch_receiver) = postage::watch::channel();
tokio::spawn(async move {
watch_sender.broadcast(42).await;
});
// 使用watch_receiver作为Stream
let value = watch_receiver
.into_stream()
.filter_map(|x| ready(x))
.next()
.await;
println!("Watched value: {:?}", value);
}
这个示例展示了:
- 使用tor-async-utils提供的oneshot通道进行简单消息传递
- 使用PostageWatchExt扩展将watch接收器转换为Stream
- 结合Tokio运行时进行异步操作
完整示例demo
以下是基于tor-async-utils的完整示例,展示了更复杂的异步操作场景:
use futures::stream::StreamExt;
use tor_async_utils::{oneshot, postage};
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
// 示例1: 使用oneshot进行任务协调
let (task_sender, task_receiver) = oneshot::channel();
let worker = tokio::spawn(async move {
println!("Worker started, waiting for task...");
match task_receiver.await {
Ok(task) => println!("Worker received task: {}", task),
Err(_) => println!("Task was cancelled before sending"),
}
});
sleep(Duration::from_secs(1)).await;
task_sender.send("Process network data").unwrap();
worker.await.unwrap();
// 示例2: 使用watch通道监控配置变化
let (config_sender, config_receiver) = postage::watch::channel::<String>("initial_config".to_string());
tokio::spawn(async move {
for i in 1..=3 {
sleep(Duration::from_secs(2)).await;
let new_config = format!("config_v{}", i);
println!("Updating config to: {}", new_config);
config_sender.broadcast(new_config).await;
}
});
// 监听配置变化
config_receiver
.into_stream()
.take(3) // 只接收3次更新
.for_each(|config| {
println!("Config changed to: {:?}", config);
futures::future::ready(())
})
.await;
println!("All examples completed");
}
主要功能
- 提供Tor项目所需的异步原语和工具
- 包含oneshot通道实现
- 提供watch类型的Stream转换支持
- 为Tor网络操作提供高效的异步支持
项目维护
该库由Tor项目核心团队维护,主要贡献者包括:
- Gabi Moldovan
- Ian Jackson
- Nick Mathewson
- David Goulet
1 回复
Rust异步工具库tor-async-utils的使用 - 为Tor网络提供高效异步操作支持
简介
tor-async-utils
是一个专为Tor网络设计的Rust异步工具库,它提供了一系列实用工具和抽象,帮助开发者更高效地构建与Tor网络交互的异步应用程序。该库建立在Tokio异步运行时之上,针对Tor网络的特殊需求进行了优化。
主要功能
- 提供连接Tor网络的异步工具
- 实现高效的异步流处理
- 提供超时和重试机制
- 支持异步任务管理
安装
在Cargo.toml中添加依赖:
[dependencies]
tor-async-utils = "0.4"
tokio = { version = "1.0", features = ["full"] }
使用示例
基本用法
use tor_async_utils::oneshot;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 创建一个oneshot通道
let (sender, receiver) = oneshot::channel();
tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
sender.send("Hello from Tor!").unwrap();
});
match receiver.await {
Ok(msg) => println!("Received: {}", msg),
Err(e) => eprintln!("Error receiving message: {}", e),
}
}
带超时的Tor请求
use tor_async_utils::timeout::timeout;
use std::time::Duration;
async fn fetch_from_tor() -> Result<String, &'static str> {
// 模拟Tor网络请求
tokio::time::sleep(Duration::from_secs(2)).await;
Ok("Tor response data".to_string())
}
#[tokio::main]
async fn main() {
match timeout(Duration::from_secs(1), fetch_from_tor()).await {
Ok(Ok(data)) => println!("Got data: {}", data),
Ok(Err(e)) => eprintln!("Request failed: {}", e),
Err(_) => eprintln!("Request timed out"),
}
}
异步流处理
use tor_async_utils::stream::StreamExt;
use futures::stream;
#[tokio::main]
async fn main() {
let numbers = stream::iter(1..=10);
let processed = numbers
.map(|n| async move { n * 2 })
.buffered(3); // 并发处理3个item
let results: Vec<_> = processed.collect().await;
println!("Processed results: {:?}", results);
}
高级特性
重试机制
use tor_async_utils::retry::{retry, RetryDelay};
use std::time::Duration;
async fn unreliable_tor_operation(attempt: u32) -> Result<(), &'static str> {
if attempt < 3 {
Err("Temporary failure")
} else {
Ok(())
}
}
#[tokio::main]
async fn main() {
let retry_policy = RetryDelay::exponential(
Duration::from_secs(1),
Duration::from_secs(30),
2.0
);
match retry(retry_policy, unreliable_tor_operation).await {
Ok(_) => println!("Operation succeeded"),
Err(e) => eprintln!("Failed after retries: {}", e),
}
}
任务管理
use tor_async_utils::task::TaskManager;
use std::time::Duration;
#[tokio::main]
async fn main() {
let task_manager = TaskManager::new();
// 添加任务
task_manager.spawn("background-task", async {
tokio::time::sleep(Duration::from_secs(2)).await;
println!("Background task completed");
});
// 等待所有任务完成
task_manager.join_all().await;
println!("All tasks finished");
}
完整示例
下面是一个结合多个特性的完整示例,展示了如何使用tor-async-utils
进行Tor网络请求:
use tor_async_utils::{
oneshot,
timeout::timeout,
retry::{retry, RetryDelay},
task::TaskManager,
stream::StreamExt
};
use futures::stream;
use std::time::Duration;
use tokio::time::{sleep};
// 模拟Tor网络请求
async fn tor_request(url: &str, attempt: u32) -> Result<String, &'static str> {
println!("Attempt {}: Requesting {}", attempt, url);
// 模拟网络延迟
sleep(Duration::from_millis(500)).await;
// 模拟前两次失败,第三次成功
if attempt < 3 {
Err("Temporary network failure")
} else {
Ok(format!("Response from {}", url))
}
}
#[tokio::main]
async fn main() {
// 创建任务管理器
let task_manager = TaskManager::new();
// 添加第一个任务 - 处理多个URL
task_manager.spawn("url-processor", async {
let urls = vec!["tor://example1.onion", "tor://example2.onion"];
// 使用流处理并发请求
let stream = stream::iter(urls)
.map(|url| async move {
// 设置重试策略
let retry_policy = RetryDelay::exponential(
Duration::from_secs(1),
Duration::from_secs(5),
2.0
);
// 带重试的请求
match retry(retry_policy, |attempt| tor_request(url, attempt)).await {
Ok(response) => println!("Success: {}", response),
Err(e) => eprintln!("Failed after retries: {}", e),
}
})
.buffered(2); // 并发2个请求
stream.collect::<Vec<_>>().await;
});
// 添加第二个任务 - 带超时的oneshot通信
task_manager.spawn("oneshot-demo", async {
let (sender, receiver) = oneshot::channel();
tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
sender.send("Message from Tor").unwrap();
});
// 设置500ms超时
match timeout(Duration::from_millis(500), receiver).await {
Ok(Ok(msg)) => println!("Received: {}", msg),
Ok(Err(e)) => eprintln!("Channel error: {}", e),
Err(_) => eprintln!("Timeout waiting for message"),
}
});
// 等待所有任务完成
task_manager.join_all().await;
println!("All tasks completed");
}
最佳实践
- 为Tor网络操作设置合理的超时时间
- 使用提供的重试机制处理网络不稳定性
- 合理控制并发量以避免被Tor节点限制
- 使用任务管理器管理长期运行的异步任务
注意事项
- 该库主要针对Tor网络特性优化,普通网络应用可能不需要这些工具
- 使用时需要熟悉Tokio异步运行时
- 生产环境中应仔细调整超时和重试参数
通过tor-async-utils
,开发者可以更轻松地构建稳定高效的Tor网络应用,专注于业务逻辑而不是底层异步处理细节。