Rust异步流处理库async-fn-stream的使用:高效转换async函数为Stream实现异步数据流
Rust异步流处理库async-fn-stream的使用:高效转换async函数为Stream实现异步数据流
async-fn-stream是一个无需宏的async-stream版本,提供了Stream
trait的通用实现。Stream
是std::iter::Iterator
的异步版本。
使用方式
基本用法
如果需要创建可能产生错误的流,使用try_fn_stream
,否则使用fn_stream
。
创建流的步骤:
- 调用
fn_stream
或try_fn_stream
,传入一个闭包 - 闭包会接收一个
emitter
- 要从流返回值,调用
emitter.emit(value)
并await
其结果
返回错误
try_fn_stream
提供了返回错误的便利方法:
- 可以通过
return Err(...)
或?
操作符从闭包返回错误,这将结束流 emitter
也有emit_err()
方法可以在不结束流的情况下返回错误
示例
有限数字流
use async_fn_stream::fn_stream;
use futures_util::Stream;
fn build_stream() -> impl Stream<Item = i32> {
fn_stream(|emitter| async move {
for i in 0..3 {
// 通过`emitter`从流中产生元素
emitter.emit(i).await;
}
})
}
从文本文件读取数字(带错误处理)
use anyhow::Context;
use async_fn_stream::try_fn_stream;
use futures_util::{pin_mut, Stream, StreamExt};
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
};
fn read_numbers(file_name: String) -> impl Stream<Item = Result<i32, anyhow::Error>> {
try_fn_stream(|emitter| async move {
// 通过`?`操作符返回错误
let file = BufReader::new(File::open(file_name).await.context("Failed to open file")?);
pin_mut!(file);
let mut line = String::new();
loop {
line.clear();
let byte_count = file
.read_line(&mut line)
.await
.context("Failed to read line")?;
if byte_count == 0 {
break;
}
for token in line.split_ascii_whitespace() {
let Ok(number) = token.parse::<i32>() else {
// 通过`emit_err`方法返回错误
emitter.emit_err(
anyhow::anyhow!("Failed to convert string \"{token}\" to number")
).await;
continue;
};
emitter.emit(number).await;
}
}
Ok(())
})
}
完整示例
下面是一个完整的示例,展示了如何使用async-fn-stream创建一个异步数据流:
use async_fn_stream::fn_stream;
use futures_util::StreamExt;
#[tokio::main]
async fn main() {
// 创建异步数据流
let stream = fn_stream(|emitter| async move {
for i in 1..=5 {
println!("生成值: {}", i);
emitter.emit(i).await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
// 消费流数据
stream
.for_each(|item| async move {
println!("处理值: {}", item);
})
.await;
}
为什么不用async-stream?
async-stream很棒!它有很好的语法,但它是基于宏的,这带来了一些缺点:
- proc-macros有时会与IDE(如rust-analyzer或IntelliJ Rust)交互不良
- proc-macros可能会增加构建时间
安装
在项目目录中运行以下Cargo命令:
cargo add async-fn-stream
或者在Cargo.toml中添加:
async-fn-stream = "0.2.2"
1 回复
基于您提供的完整内容,我将为您整理关于async-fn-stream库的使用指南,并提供一个完整的示例demo。以下是直接从您提供的内容中提取的示例:
基本示例
use async_fn_stream::fn_stream;
use futures::Stream;
// 将async函数转换为Stream
fn count_stream(max: i32) -> impl Stream<Item = i32> {
fn_stream(|emitter| async move {
for i in 0..max {
emitter.emit(i).await;
}
})
}
#[tokio::main]
async fn main() {
let mut stream = count_stream(5);
while let Some(num) = stream.next().await {
println!("Got: {}", num);
}
}
处理异步数据源
use async_fn_stream::fn_stream;
use futures::{Stream, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
async fn fetch_data(id: i32) -> String {
sleep(Duration::from_millis(100)).await;
format!("Data-{}", id)
}
fn data_stream(ids: Vec<i32>) -> impl Stream<Item = String> {
fn_stream(|emitter| async move {
for id in ids {
let data = fetch_data(id).await;
emitter.emit(data).await;
}
})
}
#[tokio::main]
async fn main() {
let ids = vec![1, 2, 3, 4, 5];
let mut stream = data_stream(ids);
while let Some(data) = stream.next().await {
println!("Received: {}", data);
}
}
错误处理
use async_fn_stream::try_fn_stream;
use futures::{Stream, StreamExt};
use std::io;
async fn fallible_operation(i: i32) -> Result<String, io::Error> {
if i % 2 == 0 {
Ok(format!("Success-{}", i))
} else {
Err(io::Error::new(io::ErrorKind::Other, "Odd numbers fail"))
}
}
fn fallible_stream(max: i32) -> impl Stream<Item = Result<String, io::Error>> {
try_fn_stream(|emitter| async move {
for i in 0..max {
let result = fallible_operation(i).await;
emitter.emit(result).await;
}
})
}
#[tokio::main]
async fn main() {
let mut stream = fallible_stream(5);
while let Some(result) = stream.next().await {
match result {
Ok(data) => println!("Success: {}", data),
Err(e) => println!("Error: {}", e),
}
}
}
完整示例Demo
以下是一个完整的综合示例,展示了如何使用async-fn-stream处理来自多个API的并发请求:
use async_fn_stream::fn_stream;
use futures::{StreamExt, stream};
use reqwest::Client;
use serde_json::Value;
use std::time::Instant;
// 异步获取API数据
async fn fetch_api_data(client: &Client, url: &str) -> Result<Value, reqwest::Error> {
let response = client.get(url).send().await?;
response.json().await
}
// 创建并发API请求流
fn api_stream(urls: Vec<String>) -> impl Stream<Item = (String, Result<Value, reqwest::Error>)> {
fn_stream(|emitter| async move {
let client = Client::new();
let mut handles = vec![];
// 为每个URL创建异步任务
for url in urls {
let client = client.clone();
let emitter = emitter.clone();
handles.push(tokio::spawn(async move {
let result = fetch_api_data(&client, &url).await;
emitter.emit((url, result)).await;
}));
}
// 等待所有任务完成
for handle in handles {
handle.await.unwrap();
}
})
}
#[tokio::main]
async fn main() {
let api_endpoints = vec![
"https://api.example.com/data1".to_string(),
"https://api.example.com/data2".to_string(),
"https://api.example.com/data3".to_string(),
];
let start_time = Instant::now();
let stream = api_stream(api_endpoints);
// 处理流结果
let results: Vec<_> = stream.collect().await;
for (url, result) in results {
match result {
Ok(data) => println!("{}: 成功获取数据 {:?}", url, data),
Err(e) => println!("{}: 请求失败 {}", url, e),
}
}
println!("总耗时: {:?}", start_time.elapsed());
}
这个完整示例展示了:
- 使用async-fn-stream创建并发API请求流
- 使用reqwest库进行HTTP请求
- 使用tokio::spawn实现并发处理
- 收集和处理所有请求结果
- 测量总执行时间
您可以根据实际需求修改API端点和处理逻辑。记得在Cargo.toml中添加必要的依赖:
[dependencies]
async-fn-stream = "0.3"
futures = "0.3"
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1.0", features = ["full"] }
serde_json = "1.0"