Rust异步流处理库async-fn-stream的使用:高效转换async函数为Stream实现异步数据流

Rust异步流处理库async-fn-stream的使用:高效转换async函数为Stream实现异步数据流

async-fn-stream是一个无需宏的async-stream版本,提供了Stream trait的通用实现。Streamstd::iter::Iterator的异步版本。

使用方式

基本用法

如果需要创建可能产生错误的流,使用try_fn_stream,否则使用fn_stream

创建流的步骤:

  1. 调用fn_streamtry_fn_stream,传入一个闭包
  2. 闭包会接收一个emitter
  3. 要从流返回值,调用emitter.emit(value)await其结果

返回错误

try_fn_stream提供了返回错误的便利方法:

  1. 可以通过return Err(...)?操作符从闭包返回错误,这将结束流
  2. emitter也有emit_err()方法可以在不结束流的情况下返回错误

示例

有限数字流

use async_fn_stream::fn_stream;
use futures_util::Stream;

fn build_stream() -> impl Stream<Item = i32> {
    fn_stream(|emitter| async move {
        for i in 0..3 {
            // 通过`emitter`从流中产生元素
            emitter.emit(i).await;
        }
    })
}

从文本文件读取数字(带错误处理)

use anyhow::Context;
use async_fn_stream::try_fn_stream;
use futures_util::{pin_mut, Stream, StreamExt};
use tokio::{
    fs::File,
    io::{AsyncBufReadExt, BufReader},
};

fn read_numbers(file_name: String) -> impl Stream<Item = Result<i32, anyhow::Error>> {
    try_fn_stream(|emitter| async move {
        // 通过`?`操作符返回错误
        let file = BufReader::new(File::open(file_name).await.context("Failed to open file")?);
        pin_mut!(file);
        let mut line = String::new();
        loop {
            line.clear();
            let byte_count = file
                .read_line(&mut line)
                .await
                .context("Failed to read line")?;
            if byte_count == 0 {
                break;
            }

            for token in line.split_ascii_whitespace() {
                let Ok(number) = token.parse::<i32>() else {
                    // 通过`emit_err`方法返回错误
                    emitter.emit_err(
                        anyhow::anyhow!("Failed to convert string \"{token}\" to number")
                    ).await;
                    continue;
                };
                emitter.emit(number).await;
            }
        }

        Ok(())
    })
}

完整示例

下面是一个完整的示例,展示了如何使用async-fn-stream创建一个异步数据流:

use async_fn_stream::fn_stream;
use futures_util::StreamExt;

#[tokio::main]
async fn main() {
    // 创建异步数据流
    let stream = fn_stream(|emitter| async move {
        for i in 1..=5 {
            println!("生成值: {}", i);
            emitter.emit(i).await;
            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        }
    });

    // 消费流数据
    stream
        .for_each(|item| async move {
            println!("处理值: {}", item);
        })
        .await;
}

为什么不用async-stream?

async-stream很棒!它有很好的语法,但它是基于宏的,这带来了一些缺点:

  • proc-macros有时会与IDE(如rust-analyzer或IntelliJ Rust)交互不良
  • proc-macros可能会增加构建时间

安装

在项目目录中运行以下Cargo命令:

cargo add async-fn-stream

或者在Cargo.toml中添加:

async-fn-stream = "0.2.2"

1 回复

基于您提供的完整内容,我将为您整理关于async-fn-stream库的使用指南,并提供一个完整的示例demo。以下是直接从您提供的内容中提取的示例:

基本示例

use async_fn_stream::fn_stream;
use futures::Stream;

// 将async函数转换为Stream
fn count_stream(max: i32) -> impl Stream<Item = i32> {
    fn_stream(|emitter| async move {
        for i in 0..max {
            emitter.emit(i).await;
        }
    })
}

#[tokio::main]
async fn main() {
    let mut stream = count_stream(5);
    
    while let Some(num) = stream.next().await {
        println!("Got: {}", num);
    }
}

处理异步数据源

use async_fn_stream::fn_stream;
use futures::{Stream, StreamExt};
use std::time::Duration;
use tokio::time::sleep;

async fn fetch_data(id: i32) -> String {
    sleep(Duration::from_millis(100)).await;
    format!("Data-{}", id)
}

fn data_stream(ids: Vec<i32>) -> impl Stream<Item = String> {
    fn_stream(|emitter| async move {
        for id in ids {
            let data = fetch_data(id).await;
            emitter.emit(data).await;
        }
    })
}

#[tokio::main]
async fn main() {
    let ids = vec![1, 2, 3, 4, 5];
    let mut stream = data_stream(ids);
    
    while let Some(data) = stream.next().await {
        println!("Received: {}", data);
    }
}

错误处理

use async_fn_stream::try_fn_stream;
use futures::{Stream, StreamExt};
use std::io;

async fn fallible_operation(i: i32) -> Result<String, io::Error> {
    if i % 2 == 0 {
        Ok(format!("Success-{}", i))
    } else {
        Err(io::Error::new(io::ErrorKind::Other, "Odd numbers fail"))
    }
}

fn fallible_stream(max: i32) -> impl Stream<Item = Result<String, io::Error>> {
    try_fn_stream(|emitter| async move {
        for i in 0..max {
            let result = fallible_operation(i).await;
            emitter.emit(result).await;
        }
    })
}

#[tokio::main]
async fn main() {
    let mut stream = fallible_stream(5);
    
    while let Some(result) = stream.next().await {
        match result {
            Ok(data) => println!("Success: {}", data),
            Err(e) => println!("Error: {}", e),
        }
    }
}

完整示例Demo

以下是一个完整的综合示例,展示了如何使用async-fn-stream处理来自多个API的并发请求:

use async_fn_stream::fn_stream;
use futures::{StreamExt, stream};
use reqwest::Client;
use serde_json::Value;
use std::time::Instant;

// 异步获取API数据
async fn fetch_api_data(client: &Client, url: &str) -> Result<Value, reqwest::Error> {
    let response = client.get(url).send().await?;
    response.json().await
}

// 创建并发API请求流
fn api_stream(urls: Vec<String>) -> impl Stream<Item = (String, Result<Value, reqwest::Error>)> {
    fn_stream(|emitter| async move {
        let client = Client::new();
        let mut handles = vec![];

        // 为每个URL创建异步任务
        for url in urls {
            let client = client.clone();
            let emitter = emitter.clone();
            handles.push(tokio::spawn(async move {
                let result = fetch_api_data(&client, &url).await;
                emitter.emit((url, result)).await;
            }));
        }

        // 等待所有任务完成
        for handle in handles {
            handle.await.unwrap();
        }
    })
}

#[tokio::main]
async fn main() {
    let api_endpoints = vec![
        "https://api.example.com/data1".to_string(),
        "https://api.example.com/data2".to_string(),
        "https://api.example.com/data3".to_string(),
    ];

    let start_time = Instant::now();
    let stream = api_stream(api_endpoints);

    // 处理流结果
    let results: Vec<_> = stream.collect().await;
    
    for (url, result) in results {
        match result {
            Ok(data) => println!("{}: 成功获取数据 {:?}", url, data),
            Err(e) => println!("{}: 请求失败 {}", url, e),
        }
    }

    println!("总耗时: {:?}", start_time.elapsed());
}

这个完整示例展示了:

  1. 使用async-fn-stream创建并发API请求流
  2. 使用reqwest库进行HTTP请求
  3. 使用tokio::spawn实现并发处理
  4. 收集和处理所有请求结果
  5. 测量总执行时间

您可以根据实际需求修改API端点和处理逻辑。记得在Cargo.toml中添加必要的依赖:

[dependencies]
async-fn-stream = "0.3"
futures = "0.3"
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1.0", features = ["full"] }
serde_json = "1.0"
回到顶部