Rust异步编程库futures-preview的使用,提供高效并发和Future抽象支持
Rust异步编程库futures-preview的使用,提供高效并发和Future抽象支持
公告
当前futures crate的开发正在0.3分支上进行。
futures-rs简介
这是一个Rust中零成本futures的实现库。
使用方式
首先,在您的Cargo.toml
中添加:
[dependencies]
futures = "0.2.2"
然后在您的crate中添加:
extern crate futures;
use futures::Future;
no_std支持
futures-rs
可以在没有标准库的环境下工作,比如裸机环境。但此时它的API会大幅减少。要在#[no_std]
环境中使用futures-rs
,请使用:
[dependencies]
futures = { version = "0.2.2", default-features = false }
完整示例代码
extern crate futures;
use futures::Future;
use futures::future::lazy;
fn main() {
// 创建一个简单的future
let future = lazy(|| {
println!("Executing in the future!");
Ok::<(), ()>(())
});
// 执行future
futures::executor::block_on(future).unwrap();
// 使用组合器
let combined = future.and_then(|_| {
println!("Chained future execution");
Ok(())
});
futures::executor::block_on(combined).unwrap();
// 并发执行多个future
let futures = vec![
lazy(|| { println!("Future 1"); Ok(1) }),
lazy(|| { println!("Future 2"); Ok(2) }),
lazy(|| { println!("Future 3"); Ok(3) }),
];
let joined = futures::future::join_all(futures);
let results = futures::executor::block_on(joined).unwrap();
println!("Results: {:?}", results);
}
示例说明
- 首先创建一个简单的
lazy
future,它会在执行时打印消息 - 使用
block_on
执行单个future - 使用
and_then
组合器将多个future串联执行 - 使用
join_all
并发执行多个future并收集结果
许可证
本项目采用以下任一许可证:
- Apache License, Version 2.0
- MIT license
贡献
除非您明确声明,否则您有意为Futures提交的任何贡献(如Apache-2.0许可证中所定义)都应按照上述方式双重许可,且不附加任何其他条款或条件。
扩展示例代码
extern crate futures;
use futures::Future;
use futures::future::{lazy, ok, err};
use futures::stream::{self, Stream};
use futures::future::Either;
fn main() {
// 示例1: 基本Future使用
let future1 = ok::<i32, i32>(42)
.and_then(|val| {
println!("Got value: {}", val);
Ok(val + 1)
})
.map(|val| {
println!("Mapped value: {}", val);
val
});
let result1 = futures::executor::block_on(future1).unwrap();
println!("Result1: {}", result1);
// 示例2: 错误处理
let future2 = err::<i32, i32>(0)
.or_else(|err| {
println!("Got error: {}", err);
Ok(err + 1)
});
let result2 = futures::executor::block_on(future2).unwrap();
println!("Result2: {}", result2);
// 示例3: Stream使用
let stream = stream::iter_ok::<_, i32>(vec![1, 2, 3])
.for_each(|val| {
println!("Stream value: {}", val);
Ok(())
});
futures::executor::block_on(stream).unwrap();
// 示例4: 选择第一个完成的Future
let future_a = lazy(|| {
std::thread::sleep(std::time::Duration::from_millis(100));
Ok::<&str, &str>("future a")
});
let future_b = lazy(|| {
std::thread::sleep(std::time::Duration::from_millis(50));
Ok::<&str, &str>("future b")
});
let future_select = future_a.select(future_b)
.then(|res| match res {
Ok(Either::Left((a, _))) => Ok(a),
Ok(Either::Right((b, _))) => Ok(b),
Err(_) => Err("error"),
});
let result_select = futures::executor::block_on(future_select).unwrap();
println!("First completed: {}", result_select);
}
扩展示例说明
- 展示了基本的Future链式操作,包括
and_then
和map
方法 - 演示了错误处理流程,使用
or_else
处理错误情况 - 展示了Stream的使用,通过
for_each
处理流中的每个元素 - 演示了使用
select
选择第一个完成的Future
1 回复
Rust异步编程库futures-preview的使用指南
介绍
futures-preview
是Rust生态中一个强大的异步编程库,它提供了高效的并发抽象和Future支持。这个库是标准库中std::future
的基础,也是async/await语法的底层支撑。
主要特点:
- 提供Future trait的稳定实现
- 支持高效的异步I/O操作
- 包含各种组合器用于处理异步操作
- 与async/await语法无缝集成
- 支持任务调度和执行器
使用方法
添加依赖
首先在Cargo.toml中添加依赖:
[dependencies]
futures-preview = { version = "0.3.0-alpha.19", features = ["async-await"] }
基本示例
use futures::executor::block_on;
use futures::future::FutureExt; // 用于组合器方法
async fn hello_world() {
println!("Hello, world!");
}
fn main() {
let future = hello_world(); // 创建一个Future
block_on(future); // 执行Future直到完成
}
Future组合器
use futures::future::{self, FutureExt, TryFutureExt};
async fn compute() -> Result<i32, String> {
Ok(42)
}
async fn app() -> Result<(), String> {
let result = compute()
.map_ok(|val| val * 2) // 成功时转换值
.map_err(|极e| format!("Error: {}", e)) // 错误时转换错误
.await?;
println!("Result: {}", result);
Ok(())
}
fn main() {
futures::executor::block_on(app()).unwrap();
}
并发执行多个Future
use futures::future::{join, try_join};
use futures::executor::block_on;
async fn get_user() -> Result<String, String> {
Ok("Alice".to_string())
}
async fn get_age() -> Result<u32, String> {
Ok(30)
}
async fn app() -> Result<(), String> {
// 并行执行两个future,等待两者都完成
let (user, age) = try_join(get_user(), get_age()).await?;
println!("User: {}, Age: {}", user, age);
Ok(())
}
fn main() {
block_on(app()).unwrap();
}
流处理(Streams)
use futures::stream::{self, StreamExt};
use futures::executor::block_on;
async fn process_stream() {
let stream = stream::iter(1..=5);
// 处理流中的每个元素
let sum = stream
.map(|x| x * 2)
.fold(0, |acc, x| async move { acc + x })
.await;
println!("Sum: {}", sum);
}
fn main() {
block_on(process_stream());
}
选择第一个完成的Future
use futures::future::{self, FutureExt, select};
use futures::pin_mut;
async fn task_one() -> &'static str {
// 模拟耗时操作
future::pending().await;
"one"
}
async fn task_two() -> &'static str {
"two"
}
async fn app() {
let t1 = task_one().fuse();
let t2 = task_two().fuse();
pin_mut!(t1);
pin_mut!(t2);
match future::select(t1, t2).await {
future::Either::Left((res, _)) => println!("Task one finished first: {:?}", res),
future::Either::Right((res, _)) => println!("Task two finished first: {:?}", res),
}
}
fn main() {
futures::executor::block_on(app());
}
高级用法
自定义Future
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::executor::block_on;
struct MyFuture {
count: u32,
}
impl Future for MyFuture {
type Output = u32;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.count += 1;
if self.count >= 3 {
Poll::Ready(self.count)
} else {
cx.waker().wake_by_ref(); // 通知执行器再次poll
Poll::Pending
}
}
}
fn main() {
let future = MyFuture { count: 0 };
let result = block_on(future);
println!("Result: {}", result); // 输出: Result: 3
}
使用通道进行任务间通信
use futures::channel极::mpsc;
use futures::executor::block_on;
use futures::SinkExt;
use futures::StreamExt;
async fn sender(mut tx: mpsc::Sender<i32>) {
for i in 0..5 {
tx.send(i).await.unwrap();
}
}
async fn receiver(mut rx: mpsc::Receiver<i32>) {
while let Some(item) = rx.next().await {
println!("Received: {}", item);
}
}
fn main() {
let (tx, rx) = mpsc::channel(5);
let sender_task = sender(tx);
let receiver_task = receiver(rx);
block_on(async {
futures::join!(sender_task, receiver_task);
});
}
注意事项
futures-preview
是早期版本,建议新项目使用futures
crate的稳定版本- 异步代码需要执行器(executor)来驱动,可以使用
block_on
或集成Tokio等运行时 - 注意Pin和Unpin trait在异步编程中的重要性
- 资源清理通常使用
AsyncDrop
模式处理
通过futures-preview
库,你可以构建高效的异步应用,利用Rust强大的类型系统和零成本抽象来实现高性能并发编程。