Rust流式处理库transform-stream的使用:高效转换与处理异步数据流的Rust插件库

Rust流式处理库transform-stream的使用:高效转换与处理异步数据流的Rust插件库

简介

transform-stream是一个轻量级的异步流包装库,灵感来源于tokio-rs的async-stream项目。

Latest Version Documentation License

安装

在项目目录中运行以下Cargo命令:

cargo add transform-stream

或者在Cargo.toml中添加:

transform-stream = "0.3.1"

示例代码

以下是transform-stream的基本使用示例:

use futures::stream::StreamExt;
use transform_stream::transform_stream;

// 创建一个简单的转换流
let stream = transform_stream(|mut sender| async move {
    for i in 0..5 {
        sender.send(i).await.unwrap();
    }
});

// 使用流
tokio::spawn(async move {
    let mut stream = stream;
    while let Some(item) = stream.next().await {
        println!("Received: {}", item);
    }
});

完整示例

下面是一个更完整的示例,展示了如何使用transform-stream处理异步数据流:

use futures::stream::{Stream, StreamExt};
use transform_stream::transform_stream;
use tokio::time::{sleep, Duration};

async fn process_data_stream() {
    // 创建转换流
    let number_stream = transform_stream(|mut sender| async move {
        for i in 1..=5 {
            sleep(Duration::from_millis(100)).await;
            sender.send(i * 10).await.unwrap();
        }
    });

    // 创建另一个流进行转换
    let transformed_stream = transform_stream(|mut sender| async move {
        let mut number_stream = number_stream;
        while let Some(num) = number_stream.next().await {
            let result = num + 5; // 对数据进行简单转换
            sender.send(result).await.unwrap();
        }
    });

    // 消费最终流
    let mut transformed_stream = transformed_stream;
    while let Some(item) = transformed_stream.next().await {
        println!("Processed value: {}", item);
    }
}

#[tokio::main]
async fn main() {
    process_data_stream().await;
}

文档

更多详细使用方法和API文档可参考官方文档。

仓库

项目源码位于GitHub仓库。

许可证

MIT License

分类

异步处理(Asynchronous)


1 回复

Rust流式处理库transform-stream的使用指南

transform-stream是一个用于高效转换与处理异步数据流的Rust库,它提供了类似Node.js中Transform Stream的功能,适合处理大量异步数据。

主要特性

  • 基于futurestokio的异步流处理
  • 支持链式转换操作
  • 内存高效,适合处理大型数据流
  • 提供多种内置转换器
  • 可自定义转换逻辑

基本使用方法

首先在Cargo.toml中添加依赖:

[dependencies]
transform-stream = "0.3"
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"

基础示例

use futures::StreamExt;
use transform_stream::TransformStream;

#[tokio::main]
async fn main() {
    // 创建一个简单的数字流
    let stream = futures::stream::iter(1..=10);
    
    // 使用transform-stream进行转换
    let transformed = TransformStream::new(stream, |x| async move {
        x * 2 // 将每个元素乘以2
    });
    
    // 收集结果
    let results: Vec<_> = transformed.collect().await;
    println!("{:?}", results); // 输出: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
}

链式转换

use futures::StreamExt;
use transform_stream::TransformStream;

#[tokio::main]
async fn main() {
    let stream = futures::stream::iter(1..=5);
    
    let result = TransformStream::new(stream, |x| async move { x * 2 })
        .transform(|x| async move { x + 1 })
        .transform(|x| async move { format!("Value: {}", x) })
        .collect::<Vec<_>>()
        .await;
    
    println!("{:?}", result);
    // 输出: ["Value: 3", "Value: 5", "Value: 7", "Value: 9", "Value: 11"]
}

错误处理

use futures::StreamExt;
use transform_stream::TransformStream;
use std::num::ParseIntError;

#[tokio::main]
async fn main() -> Result<(), ParseIntError> {
    let strings = vec!["1", "2", "three", "4"];
    let stream = futures::stream::iter(strings);
    
    let parsed = TransformStream::new(stream, |s| async move {
        s.parse::<i32>()
    });
    
    let results: Result<Vec<_>, _> = parsed.collect().await;
    
    match results {
        Ok(nums) => println!("Parsed numbers: {:?}", nums),
        Err(e) => println!("Error parsing: {}", e),
    }
    
    Ok(())
}

自定义转换器

use futures::StreamExt;
use transform_stream::TransformStream;

struct Doubler;

impl Doubler {
    async fn process(&self, x: i32) -> i32 {
        x * 2
    }
}

#[tokio::main]
async fn main() {
    let doubler = Doubler;
    let stream = futures::stream::iter(1..=5);
    
    let transformed = TransformStream::new(stream, |x| async move {
        doubler.process(x).await
    });
    
    let results: Vec<_> = transformed.collect().await;
    println!("{:?}", results); // [2, 4, 6, 8, 10]
}

性能提示

  1. 对于CPU密集型转换操作,考虑使用tokio::task::spawn_blocking
  2. 批量处理数据可以提高吞吐量
  3. 合理设置缓冲区大小以平衡内存使用和性能

transform-stream库非常适合处理各种异步数据转换场景,如日志处理、数据清洗、格式转换等任务。

完整示例demo

下面是一个结合多个特性的完整示例,展示如何使用transform-stream处理文件数据流:

use futures::StreamExt;
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
use transform_stream::TransformStream;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 打开文件并创建异步缓冲读取器
    let file = File::open("data.txt").await?;
    let reader = BufReader::new(file);
    
    // 创建行流
    let lines_stream = reader.lines();
    
    // 定义自定义转换器
    struct LineProcessor;
    
    impl LineProcessor {
        async fn process(&self, line: String) -> Result<usize, std::io::Error> {
            Ok(line.len()) // 返回每行的长度
        }
    }
    
    let processor = LineProcessor;
    
    // 创建转换流
    let transformed = TransformStream::new(lines_stream, |line_result| async move {
        match line_result {
            Ok(line) => processor.process(line).await,
            Err(e) => Err(e),
        }
    });
    
    // 链式转换:过滤掉短行
    let filtered = transformed
        .transform(|len_result| async move {
            match len_result {
                Ok(len) if len > 5 => Some(len),
                _ => None,
            }
        })
        .filter_map(|x| x); // 过滤掉None值
    
    // 收集结果
    let line_lengths: Vec<_> = filtered.collect().await;
    println!("Line lengths: {:?}", line_lengths);
    
    Ok(())
}

这个完整示例展示了:

  1. 从文件创建异步流
  2. 使用自定义转换器处理每行数据
  3. 链式转换和过滤操作
  4. 完整的错误处理流程
  5. 最终收集并输出结果

注意:要运行此示例,需要在项目目录下创建一个包含文本数据的data.txt文件。

回到顶部