Rust异步编程工具库postage的使用,postage提供强大的Stream和Sink扩展功能

功能丰富、可移植的异步通道库

为什么使用Postage?

  • 包含丰富的通道集合
  • 可与任何执行器配合使用
    • 目前针对tokioasync-std进行了回归测试
    • 使用futures-traits功能时,通道实现了futures的Sink/Stream特性
  • 经过全面测试
    • 通道具有完整的单元测试覆盖率,并与多个异步执行器进行了集成测试
  • 包含内置的Sink和Stream组合器
    • Sink可以被链式连接和过滤
    • Stream可以被链式连接、过滤、映射和合并
    • Sink和Stream可以记录它们的值,便于应用程序调试

通道

postage::barrier 屏障通道可用于同步事件,但不传输任何数据。当发送器被丢弃(或调用tx.send(()))时,接收器被唤醒。这可用于在任务之间异步协调操作。

postage::broadcast 广播通道在多个发送器和多个接收器之间提供可靠的广播传递。通道具有固定容量,如果缓冲区已满,发送器将被挂起。

当接收器被克隆时,两个接收器将接收相同的消息序列。

发送器还提供subscribe()方法,该方法创建一个接收器,该接收器将观察调用subscribe之后发送的所有消息。

postage::dispatch 分发通道提供多发送器、多接收器的消息分发。一条消息最多被一个接收器观察到。通道具有固定容量,如果缓冲区已满,发送器将被挂起。

可以通过rx.clone()tx.subscribe()创建接收器。

postage::mpsc Postage包含一个固定容量的多生产者、单消费者通道。生产者可以被克隆,如果通道已满,发送器任务将被挂起。

postage::oneshot 一次性通道在发送器和接收器之间传输单个值。两者都不能被克隆。如果发送器被丢弃,接收器将收到None值。

postage::watch 观察通道可用于异步传输状态。当接收器被创建时,它们立即接收一个初始值。它们还将接收新值,但不保证接收每个值。

通过观察通道传输的值必须实现Default。实现这一点的一个简单方法是传输Option<T>

基准测试 Postage通道以及可比较的async-std/tokio通道的基准测试。

  • send/recv测量发送和接收一个项目的总时间
  • send full测量在满通道上发送一个项目并获得Poll::Pending值的时间
  • recv empty测量在空通道上获得Poll::Pending值的时间

所有基准测试均使用criterion进行,位于benches目录中。

通道 send/recv send full recv empty
broadcast postage 114ns 7ns 8ns
broadcast tokio 98ns (-14%) 54ns 37ns
-
dispatch postage 80ns 26ns 25ns
dispatch async_std 41ns (-48%) 10ns 11ns
-
mpsc postage 83ns 27ns 30ns
mpsc tokio 85ns (+1%) 2ns 35ns
-
watch postage 96ns - 7ns
watch tokio 73ns (-23%) - 75ns
// 完整示例:使用postage的mpsc通道和Stream/Sink功能
use postage::mpsc;
use postage::prelude::*;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 创建容量为10的mpsc通道
    let (mut tx, mut rx) = mpsc::channel(10);

    // 生产者任务
    tokio::spawn(async move {
        for i in 0..5 {
            // 使用Sink的send方法发送数据
            tx.send(i).await.unwrap();
            println!("Sent: {}", i);
            sleep(Duration::from_millis(100)).await;
        }
        // 关闭发送端
        tx.close();
    });

    // 消费者任务 - 使用Stream功能
    while let Some(value) = rx.recv().await {
        println!("Received: {}", value);
    }

    println!("Channel closed");
}
// 使用postage的broadcast通道和Stream组合器示例
use postage::broadcast;
use postage::prelude::*;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 创建容量为5的广播通道
    let (tx, mut rx1) = broadcast::channel(5);

    // 克隆接收器
    let mut rx2 = rx1.clone();

    // 发送器任务
    tokio::spawn(async move {
        for i in 0..10 {
            tx.send(i).await.unwrap();
            println!("Broadcast: {}", i);
            sleep(Duration::from_millis(50)).await;
        }
    });

    // 接收器1任务 - 使用Stream的map组合器
    tokio::spawn(async move {
        while let Some(value) = rx1.recv().await {
            let mapped = value * 2;
            println!("Receiver 1 mapped: {} -> {}", value, mapped);
        }
    });

    // 接收器2任务 - 使用Stream的filter组合器
    tokio::spawn(async move {
        while let Some(value) = rx2.recv().await {
            if value % 2 == 0 {
                println!("Receiver 2 filtered: {}", value);
            }
        }
    });

    sleep(Duration::from_secs(2)).await;
}
// 使用postage的Sink日志记录功能示例
use postage::mpsc;
use postage::sink::Sink;
use postage::prelude::*;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = mpsc::channel(5);

    // 使用log_sink添加日志记录
    let mut logging_tx = tx.log_sink(|value| {
        println!("[LOG] Sending value: {}", value);
    });

    // 发送数据
    tokio::spawn(async move {
        for i in 0..3 {
            logging_tx.send(i).await.unwrap();
            sleep(Duration::from_millis(200)).await;
        }
    });

    // 接收数据
    while let Some(value) = rx.recv().await {
        println!("Main received: {}", value);
    }
}

完整示例demo:

// 完整示例:综合使用Postage多种通道类型
use postage::{barrier, broadcast, dispatch, mpsc, oneshot, watch};
use postage::prelude::*;
use tokio::time::{sleep, Duration};
use std::sync::Arc;

#[tokio::main]
async fn main() {
    println!("=== Postage通道库综合示例 ===");
    
    // 示例1: 使用mpsc通道进行基本通信
    println!("\n1. MPSC通道示例:");
    let (mut mpsc_tx, mut mpsc_rx) = mpsc::channel(3);
    
    tokio::spawn(async move {
        for i in 0..3 {
            mpsc_tx.send(format!("消息{}", i)).await.unwrap();
            sleep(Duration::from_millis(100)).await;
        }
    });
    
    while let Some(msg) = mpsc_rx.recv().await {
        println!("收到MPSC消息: {}", msg);
    }

    // 示例2: 使用broadcast广播通道
    println!("\n2. 广播通道示例:");
    let (broadcast_tx, broadcast_rx1) = broadcast::channel(5);
    let mut broadcast_rx2 = broadcast_rx1.clone();
    
    // 广播发送任务
    let broadcast_tx_clone = broadcast_tx.clone();
    tokio::spawn(async move {
        for i in 0..5 {
            broadcast_tx_clone.send(i).await.unwrap();
            sleep(Duration::from_millis(50)).await;
        }
    });
    
    // 接收器1
    tokio::spawn(async move {
        while let Some(value) = broadcast_rx1.recv().await {
            println!("广播接收器1: {}", value);
        }
    });
    
    // 接收器2
    tokio::spawn(async move {
        while let Some(value) = broadcast_rx2.recv().await {
            println!("广播接收器2: {}", value);
        }
    });

    sleep(Duration::from_millis(300)).await;

    // 示例3: 使用oneshot一次性通道
    println!("\n3. 一次性通道示例:");
    let (oneshot_tx, oneshot_rx) = oneshot::channel();
    
    tokio::spawn(async move {
        sleep(Duration::from_millis(100)).await;
        oneshot_tx.send("一次性消息".to_string()).unwrap();
    });
    
    if let Some(msg) = oneshot_rx.recv().await {
        println!("收到一次性消息: {}", msg);
    }

    // 示例4: 使用watch观察通道
    println!("\n4. 观察通道示例:");
    let (watch_tx, watch_rx) = watch::channel(Some(0));
    
    // 状态更新任务
    let watch_tx_clone = watch_tx.clone();
    tokio::spawn(async move {
        for i in 1..=3 {
            sleep(Duration::from_millis(150)).await;
            watch_tx_clone.send(Some(i)).await.unwrap();
        }
    });
    
    // 状态观察任务
    let mut watch_rx_clone = watch_rx.clone();
    tokio::spawn(async move {
        while let Some(value) = watch_rx_clone.recv().await {
            if let Some(v) = value {
                println!("观察通道状态更新: {}", v);
            }
        }
    });

    sleep(Duration::from_millis(500)).await;

    // 示例5: 使用barrier屏障同步
    println!("\n5. 屏障通道示例:");
    let (barrier_tx, mut barrier_rx) = barrier::channel();
    
    let barrier_tx_clone = barrier_tx.clone();
    tokio::spawn(async move {
        sleep(Duration::from_millis(200)).await;
        println!("任务完成,发送同步信号");
        barrier_tx_clone.send(()).await.unwrap();
    });
    
    barrier_rx.recv().await;
    println!("所有任务同步完成!");

    // 示例6: 使用Sink和Stream组合器
    println!("\n6. Sink和Stream组合器示例:");
    let (mut sink_tx, mut stream_rx) = mpsc::channel(10);
    
    // 使用log_sink记录发送的数据
    let mut logging_sink = sink_tx.log_sink(|value| {
        println!("[DEBUG] 发送值: {}", value);
    });
    
    // 使用map组合器处理接收的数据
    let mapped_stream = stream_rx.map(|x: i32| x * 2);
    
    tokio::spawn(async move {
        for i in 0..3 {
            logging_sink.send(i).await.unwrap();
            sleep(Duration::from_millis(100)).await;
        }
    });
    
    let mut mapped_rx = mapped_stream;
    while let Some(value) = mapped_rx.recv().await {
        println!("映射后的值: {}", value);
    }

    println!("\n=== 所有示例执行完成 ===");
    sleep(Duration::from_secs(1)).await;
}

1 回复

Rust异步编程工具库postage的使用指南

介绍

postage是一个强大的Rust异步编程工具库,专注于提供Stream和Sink的扩展功能。它建立在futures库之上,为异步编程提供了更简洁、更易用的API接口。postage特别适合需要处理异步数据流和管道的应用场景。

主要特性

  • 丰富的Stream扩展方法
  • 强大的Sink组合功能
  • 简洁的错误处理机制
  • 与async/await语法完美集成

安装方法

在Cargo.toml中添加依赖:

[dependencies]
postage = "0.5"
futures = "0.3"

基本使用方法

Stream扩展示例

use postage::prelude::*;
use futures::stream;

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![1, 2, 3, 4, 5])
        .map(|x| x * 2)
        .filter(|x| *x > 5);
    
    while let Some(value) = stream.recv().await {
        println!("Received: {}", value);
    }
}

Sink使用示例

use postage::sink::Sink;
use futures::sink;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut sink = sink::drain().sink_map_err(|_| "Mapping error");
    
    sink.send("Hello").await?;
    sink.send("World").await?;
    
    Ok(())
}

组合Stream和Sink

use postage::prelude::*;
use futures::{stream, sink};

#[tokio::main]
async fn main() {
    let source = stream::iter(0..10);
    let mut sink = sink::drain();
    
    source.forward(&mut sink).await.unwrap();
}

错误处理示例

use postage::prelude::*;
use futures::stream;

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![Ok(1), Err("error"), Ok(3)])
        .map_err(|e| println!("Error: {}", e))
        .filter_map(|x| async move { x.ok() });
    
    while let Some(value) = stream.recv().await {
        println!("Value: {}", value);
    }
}

高级功能

批量处理

use postage::prelude::*;
use futures::stream;

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(0..100)
        .chunks(10);
    
    while let Some(batch) = stream.recv().await {
        println!("Batch: {:?}", batch);
    }
}

超时控制

use postage::prelude::*;
use futures::stream;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(0..5)
        .timeout(Duration::from_secs(1));
    
    while let Some(result) = stream.recv().await {
        match result {
            Ok(value) => println!("Value: {}", value),
            Err(_) => println!("Timeout occurred"),
        }
    }
}

完整示例demo

// 完整示例:使用postage库处理异步数据流
use postage::prelude::*;
use futures::{stream, sink};
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("=== Stream扩展功能示例 ===");
    
    // 创建基础流并应用postage扩展方法
    let mut number_stream = stream::iter(1..=10)
        .map(|x| x * 2)                    // 将每个值乘以2
        .filter(|x| *x > 5)                // 过滤大于5的值
        .timeout(Duration::from_millis(500)); // 设置超时控制

    while let Some(result) = number_stream.recv().await {
        match result {
            Ok(value) => println!("处理后的数值: {}", value),
            Err(_) => println!("处理超时"),
        }
    }

    println!("\n=== Sink功能示例 ===");
    
    // 创建Sink并发送数据
    let mut message_sink = sink::drain()
        .sink_map_err(|_| "Sink映射错误"); // 错误映射

    message_sink.send("第一条消息").await?;
    message_sink.send("第二条消息").await?;
    println!("消息发送完成");

    println!("\n=== 批量处理示例 ===");
    
    // 批量处理数据
    let mut batch_stream = stream::iter(0..25)
        .chunks(5) // 每5个元素一批
        .map(|batch| {
            println!("处理批次: {:?}", batch);
            batch.into_iter().sum::<i32>()
        });

    while let Some(sum) = batch_stream.recv().await {
        println!("批次总和: {}", sum);
    }

    println!("\n=== 错误处理示例 ===");
    
    // 错误处理演示
    let mut error_stream = stream::iter(vec![
        Ok("成功数据1"),
        Err("模拟错误"),
        Ok("成功数据2"),
        Err("另一个错误"),
        Ok("成功数据3")
    ])
    .map_err(|e: &str| println!("捕获到错误: {}", e)) // 错误处理
    .filter_map(|x| async move { x.ok() }); // 过滤掉错误,只保留成功数据

    while let Some(value) = error_stream.recv().await {
        println!("有效数据: {}", value);
    }

    println!("\n=== 组合Stream和Sink示例 ===");
    
    // 组合使用Stream和Sink
    let data_source = stream::iter(vec!["数据A", "数据B", "数据C", "数据D"]);
    let mut data_sink = sink::drain()
        .sink_map_err(|_| "数据处理错误");

    // 将流中的数据转发到sink
    match data_source.forward(&mut data_sink).await {
        Ok(_) => println!("数据转发完成"),
        Err(e) => println!("转发错误: {}", e),
    }

    Ok(())
}

注意事项

  1. 确保正确导入postage的prelude模块以使用所有扩展方法
  2. 在处理错误时注意错误类型的转换
  3. 合理使用超时和背压控制以避免资源耗尽

postage库通过提供简洁的API和强大的组合能力,大大简化了Rust异步编程的复杂度,特别是在处理数据流管道时表现出色。

回到顶部