Rust异步编程工具库postage的使用,postage提供强大的Stream和Sink扩展功能
功能丰富、可移植的异步通道库
为什么使用Postage?
- 包含丰富的通道集合
- 可与任何执行器配合使用
- 目前针对
tokio
和async-std
进行了回归测试 - 使用
futures-traits
功能时,通道实现了futures的Sink/Stream
特性
- 目前针对
- 经过全面测试
- 通道具有完整的单元测试覆盖率,并与多个异步执行器进行了集成测试
- 包含内置的Sink和Stream组合器
- Sink可以被链式连接和过滤
- Stream可以被链式连接、过滤、映射和合并
- Sink和Stream可以记录它们的值,便于应用程序调试
通道
postage::barrier
屏障通道可用于同步事件,但不传输任何数据。当发送器被丢弃(或调用tx.send(())
)时,接收器被唤醒。这可用于在任务之间异步协调操作。
postage::broadcast 广播通道在多个发送器和多个接收器之间提供可靠的广播传递。通道具有固定容量,如果缓冲区已满,发送器将被挂起。
当接收器被克隆时,两个接收器将接收相同的消息序列。
发送器还提供subscribe()
方法,该方法创建一个接收器,该接收器将观察调用subscribe之后发送的所有消息。
postage::dispatch 分发通道提供多发送器、多接收器的消息分发。一条消息最多被一个接收器观察到。通道具有固定容量,如果缓冲区已满,发送器将被挂起。
可以通过rx.clone()
或tx.subscribe()
创建接收器。
postage::mpsc Postage包含一个固定容量的多生产者、单消费者通道。生产者可以被克隆,如果通道已满,发送器任务将被挂起。
postage::oneshot
一次性通道在发送器和接收器之间传输单个值。两者都不能被克隆。如果发送器被丢弃,接收器将收到None
值。
postage::watch 观察通道可用于异步传输状态。当接收器被创建时,它们立即接收一个初始值。它们还将接收新值,但不保证接收每个值。
通过观察通道传输的值必须实现Default。实现这一点的一个简单方法是传输Option<T>
。
基准测试 Postage通道以及可比较的async-std/tokio通道的基准测试。
send/recv
测量发送和接收一个项目的总时间send full
测量在满通道上发送一个项目并获得Poll::Pending
值的时间recv empty
测量在空通道上获得Poll::Pending
值的时间
所有基准测试均使用criterion进行,位于benches
目录中。
包 | 通道 | send/recv | send full | recv empty |
---|---|---|---|---|
broadcast | postage | 114ns | 7ns | 8ns |
broadcast | tokio | 98ns (-14%) | 54ns | 37ns |
- | ||||
dispatch | postage | 80ns | 26ns | 25ns |
dispatch | async_std | 41ns (-48%) | 10ns | 11ns |
- | ||||
mpsc | postage | 83ns | 27ns | 30ns |
mpsc | tokio | 85ns (+1%) | 2ns | 35ns |
- | ||||
watch | postage | 96ns | - | 7ns |
watch | tokio | 73ns (-23%) | - | 75ns |
// 完整示例:使用postage的mpsc通道和Stream/Sink功能
use postage::mpsc;
use postage::prelude::*;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 创建容量为10的mpsc通道
let (mut tx, mut rx) = mpsc::channel(10);
// 生产者任务
tokio::spawn(async move {
for i in 0..5 {
// 使用Sink的send方法发送数据
tx.send(i).await.unwrap();
println!("Sent: {}", i);
sleep(Duration::from_millis(100)).await;
}
// 关闭发送端
tx.close();
});
// 消费者任务 - 使用Stream功能
while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
println!("Channel closed");
}
// 使用postage的broadcast通道和Stream组合器示例
use postage::broadcast;
use postage::prelude::*;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 创建容量为5的广播通道
let (tx, mut rx1) = broadcast::channel(5);
// 克隆接收器
let mut rx2 = rx1.clone();
// 发送器任务
tokio::spawn(async move {
for i in 0..10 {
tx.send(i).await.unwrap();
println!("Broadcast: {}", i);
sleep(Duration::from_millis(50)).await;
}
});
// 接收器1任务 - 使用Stream的map组合器
tokio::spawn(async move {
while let Some(value) = rx1.recv().await {
let mapped = value * 2;
println!("Receiver 1 mapped: {} -> {}", value, mapped);
}
});
// 接收器2任务 - 使用Stream的filter组合器
tokio::spawn(async move {
while let Some(value) = rx2.recv().await {
if value % 2 == 0 {
println!("Receiver 2 filtered: {}", value);
}
}
});
sleep(Duration::from_secs(2)).await;
}
// 使用postage的Sink日志记录功能示例
use postage::mpsc;
use postage::sink::Sink;
use postage::prelude::*;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = mpsc::channel(5);
// 使用log_sink添加日志记录
let mut logging_tx = tx.log_sink(|value| {
println!("[LOG] Sending value: {}", value);
});
// 发送数据
tokio::spawn(async move {
for i in 0..3 {
logging_tx.send(i).await.unwrap();
sleep(Duration::from_millis(200)).await;
}
});
// 接收数据
while let Some(value) = rx.recv().await {
println!("Main received: {}", value);
}
}
完整示例demo:
// 完整示例:综合使用Postage多种通道类型
use postage::{barrier, broadcast, dispatch, mpsc, oneshot, watch};
use postage::prelude::*;
use tokio::time::{sleep, Duration};
use std::sync::Arc;
#[tokio::main]
async fn main() {
println!("=== Postage通道库综合示例 ===");
// 示例1: 使用mpsc通道进行基本通信
println!("\n1. MPSC通道示例:");
let (mut mpsc_tx, mut mpsc_rx) = mpsc::channel(3);
tokio::spawn(async move {
for i in 0..3 {
mpsc_tx.send(format!("消息{}", i)).await.unwrap();
sleep(Duration::from_millis(100)).await;
}
});
while let Some(msg) = mpsc_rx.recv().await {
println!("收到MPSC消息: {}", msg);
}
// 示例2: 使用broadcast广播通道
println!("\n2. 广播通道示例:");
let (broadcast_tx, broadcast_rx1) = broadcast::channel(5);
let mut broadcast_rx2 = broadcast_rx1.clone();
// 广播发送任务
let broadcast_tx_clone = broadcast_tx.clone();
tokio::spawn(async move {
for i in 0..5 {
broadcast_tx_clone.send(i).await.unwrap();
sleep(Duration::from_millis(50)).await;
}
});
// 接收器1
tokio::spawn(async move {
while let Some(value) = broadcast_rx1.recv().await {
println!("广播接收器1: {}", value);
}
});
// 接收器2
tokio::spawn(async move {
while let Some(value) = broadcast_rx2.recv().await {
println!("广播接收器2: {}", value);
}
});
sleep(Duration::from_millis(300)).await;
// 示例3: 使用oneshot一次性通道
println!("\n3. 一次性通道示例:");
let (oneshot_tx, oneshot_rx) = oneshot::channel();
tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
oneshot_tx.send("一次性消息".to_string()).unwrap();
});
if let Some(msg) = oneshot_rx.recv().await {
println!("收到一次性消息: {}", msg);
}
// 示例4: 使用watch观察通道
println!("\n4. 观察通道示例:");
let (watch_tx, watch_rx) = watch::channel(Some(0));
// 状态更新任务
let watch_tx_clone = watch_tx.clone();
tokio::spawn(async move {
for i in 1..=3 {
sleep(Duration::from_millis(150)).await;
watch_tx_clone.send(Some(i)).await.unwrap();
}
});
// 状态观察任务
let mut watch_rx_clone = watch_rx.clone();
tokio::spawn(async move {
while let Some(value) = watch_rx_clone.recv().await {
if let Some(v) = value {
println!("观察通道状态更新: {}", v);
}
}
});
sleep(Duration::from_millis(500)).await;
// 示例5: 使用barrier屏障同步
println!("\n5. 屏障通道示例:");
let (barrier_tx, mut barrier_rx) = barrier::channel();
let barrier_tx_clone = barrier_tx.clone();
tokio::spawn(async move {
sleep(Duration::from_millis(200)).await;
println!("任务完成,发送同步信号");
barrier_tx_clone.send(()).await.unwrap();
});
barrier_rx.recv().await;
println!("所有任务同步完成!");
// 示例6: 使用Sink和Stream组合器
println!("\n6. Sink和Stream组合器示例:");
let (mut sink_tx, mut stream_rx) = mpsc::channel(10);
// 使用log_sink记录发送的数据
let mut logging_sink = sink_tx.log_sink(|value| {
println!("[DEBUG] 发送值: {}", value);
});
// 使用map组合器处理接收的数据
let mapped_stream = stream_rx.map(|x: i32| x * 2);
tokio::spawn(async move {
for i in 0..3 {
logging_sink.send(i).await.unwrap();
sleep(Duration::from_millis(100)).await;
}
});
let mut mapped_rx = mapped_stream;
while let Some(value) = mapped_rx.recv().await {
println!("映射后的值: {}", value);
}
println!("\n=== 所有示例执行完成 ===");
sleep(Duration::from_secs(1)).await;
}
Rust异步编程工具库postage的使用指南
介绍
postage是一个强大的Rust异步编程工具库,专注于提供Stream和Sink的扩展功能。它建立在futures库之上,为异步编程提供了更简洁、更易用的API接口。postage特别适合需要处理异步数据流和管道的应用场景。
主要特性
- 丰富的Stream扩展方法
- 强大的Sink组合功能
- 简洁的错误处理机制
- 与async/await语法完美集成
安装方法
在Cargo.toml中添加依赖:
[dependencies]
postage = "0.5"
futures = "0.3"
基本使用方法
Stream扩展示例
use postage::prelude::*;
use futures::stream;
#[tokio::main]
async fn main() {
let mut stream = stream::iter(vec![1, 2, 3, 4, 5])
.map(|x| x * 2)
.filter(|x| *x > 5);
while let Some(value) = stream.recv().await {
println!("Received: {}", value);
}
}
Sink使用示例
use postage::sink::Sink;
use futures::sink;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut sink = sink::drain().sink_map_err(|_| "Mapping error");
sink.send("Hello").await?;
sink.send("World").await?;
Ok(())
}
组合Stream和Sink
use postage::prelude::*;
use futures::{stream, sink};
#[tokio::main]
async fn main() {
let source = stream::iter(0..10);
let mut sink = sink::drain();
source.forward(&mut sink).await.unwrap();
}
错误处理示例
use postage::prelude::*;
use futures::stream;
#[tokio::main]
async fn main() {
let mut stream = stream::iter(vec![Ok(1), Err("error"), Ok(3)])
.map_err(|e| println!("Error: {}", e))
.filter_map(|x| async move { x.ok() });
while let Some(value) = stream.recv().await {
println!("Value: {}", value);
}
}
高级功能
批量处理
use postage::prelude::*;
use futures::stream;
#[tokio::main]
async fn main() {
let mut stream = stream::iter(0..100)
.chunks(10);
while let Some(batch) = stream.recv().await {
println!("Batch: {:?}", batch);
}
}
超时控制
use postage::prelude::*;
use futures::stream;
use std::time::Duration;
#[tokio::main]
async fn main() {
let mut stream = stream::iter(0..5)
.timeout(Duration::from_secs(1));
while let Some(result) = stream.recv().await {
match result {
Ok(value) => println!("Value: {}", value),
Err(_) => println!("Timeout occurred"),
}
}
}
完整示例demo
// 完整示例:使用postage库处理异步数据流
use postage::prelude::*;
use futures::{stream, sink};
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== Stream扩展功能示例 ===");
// 创建基础流并应用postage扩展方法
let mut number_stream = stream::iter(1..=10)
.map(|x| x * 2) // 将每个值乘以2
.filter(|x| *x > 5) // 过滤大于5的值
.timeout(Duration::from_millis(500)); // 设置超时控制
while let Some(result) = number_stream.recv().await {
match result {
Ok(value) => println!("处理后的数值: {}", value),
Err(_) => println!("处理超时"),
}
}
println!("\n=== Sink功能示例 ===");
// 创建Sink并发送数据
let mut message_sink = sink::drain()
.sink_map_err(|_| "Sink映射错误"); // 错误映射
message_sink.send("第一条消息").await?;
message_sink.send("第二条消息").await?;
println!("消息发送完成");
println!("\n=== 批量处理示例 ===");
// 批量处理数据
let mut batch_stream = stream::iter(0..25)
.chunks(5) // 每5个元素一批
.map(|batch| {
println!("处理批次: {:?}", batch);
batch.into_iter().sum::<i32>()
});
while let Some(sum) = batch_stream.recv().await {
println!("批次总和: {}", sum);
}
println!("\n=== 错误处理示例 ===");
// 错误处理演示
let mut error_stream = stream::iter(vec![
Ok("成功数据1"),
Err("模拟错误"),
Ok("成功数据2"),
Err("另一个错误"),
Ok("成功数据3")
])
.map_err(|e: &str| println!("捕获到错误: {}", e)) // 错误处理
.filter_map(|x| async move { x.ok() }); // 过滤掉错误,只保留成功数据
while let Some(value) = error_stream.recv().await {
println!("有效数据: {}", value);
}
println!("\n=== 组合Stream和Sink示例 ===");
// 组合使用Stream和Sink
let data_source = stream::iter(vec!["数据A", "数据B", "数据C", "数据D"]);
let mut data_sink = sink::drain()
.sink_map_err(|_| "数据处理错误");
// 将流中的数据转发到sink
match data_source.forward(&mut data_sink).await {
Ok(_) => println!("数据转发完成"),
Err(e) => println!("转发错误: {}", e),
}
Ok(())
}
注意事项
- 确保正确导入postage的prelude模块以使用所有扩展方法
- 在处理错误时注意错误类型的转换
- 合理使用超时和背压控制以避免资源耗尽
postage库通过提供简洁的API和强大的组合能力,大大简化了Rust异步编程的复杂度,特别是在处理数据流管道时表现出色。