Rust异步任务执行器futures-executor-preview的使用:高效管理Future并发与调度
Rust异步任务执行器futures-executor-preview的使用:高效管理Future并发与调度
安装
在项目目录中运行以下Cargo命令:
cargo add futures-executor-preview
或者在Cargo.toml中添加以下行:
futures-executor-preview = "0.2.2"
基本用法
futures-executor-preview提供了几种执行Future的方式:
use futures::executor::block_on;
use futures::future::{join_all, ready};
async fn hello_world() {
println!("hello, world!");
}
fn main() {
// 使用block_on同步执行一个Future
block_on(hello_world());
// 并发执行多个Future
let futures = vec![ready(1), ready(2), ready(3)];
let results = block_on(join_all(futures));
println!("{:?}", results); // 输出: [1, 2, 3]
}
完整示例
下面是一个更完整的示例,展示如何使用线程池执行器来并发执行多个异步任务:
use futures::executor::{ThreadPool, block_on};
use futures::future::{join_all, lazy};
use std::time::Duration;
// 模拟异步任务
async fn async_task(id: u32, duration: u64) -> u32 {
println!("Task {} started", id);
futures::executor::block_on(lazy(|_| {
std::thread::sleep(Duration::from_millis(duration));
}));
println!("Task {} completed", id);
id
}
fn main() {
// 创建线程池执行器
let pool = ThreadPool::new().expect("Failed to build thread pool");
// 创建多个异步任务
let tasks = vec![
pool.spawn_with_handle(async_task(1, 100)),
pool.spawn_with_handle(async_task(2, 200)),
pool.spawn_with_handle(async_task(3, 50)),
];
// 等待所有任务完成
let results = block_on(join_all(tasks));
println!("All tasks completed: {:?}", results);
}
示例说明
- block_on - 同步阻塞当前线程直到Future完成
- ThreadPool - 创建一个线程池来并发执行多个Future
- spawn_with_handle - 将Future提交到线程池执行并返回Handle
- join_all - 等待多个Future全部完成
特性
- 轻量级的Future执行器
- 提供线程池实现
- 支持本地和全局执行器
- 与标准库Future兼容
1 回复
Rust异步任务执行器futures-executor-preview
的使用:高效管理Future并发与调度
介绍
futures-executor-preview
是Rust中用于执行和管理Future
的轻量级执行器,属于futures
库的一部分。它提供了一种简单的方式来运行异步任务,特别适合需要高效并发和调度的场景。
这个执行器主要提供了两种执行方式:
ThreadPool
- 线程池执行器,适合CPU密集型任务LocalPool
- 单线程执行器,适合I/O密集型任务或需要确定性的场景
使用方法
添加依赖
首先在Cargo.toml
中添加依赖:
[dependencies]
futures-executor-preview = "0.3.0-alpha.19"
基本示例
使用ThreadPool
use futures::future;
use futures_executor::ThreadPool;
async fn compute_square(x: i32) -> i32 {
x * x
}
fn main() {
// 创建线程池
let pool = ThreadPool::new().expect("Failed to build thread pool");
// 创建一组异步任务
let futures = vec![
pool.spawn_with_handle(compute_square(2)),
pool.spawn_with_handle(compute_square(3)),
pool.spawn_with_handle(compute_square(4)),
];
// 等待所有任务完成并收集结果
let results = pool.run(future::join_all(futures));
println!("Results: {:?}", results); // 输出: Results: [4, 9, 16]
}
使用LocalPool
use futures::future;
use futures_executor::LocalPool;
async fn greet(name: &str) -> String {
format!("Hello, {}!", name)
}
fn main() {
// 创建本地执行器
let mut pool = LocalPool::new();
let spawner = pool.spawner();
// 创建异步任务
let task1 = spawner.spawn_local(greet("Alice"));
let task2 = spawner.spawn_local(greet("Bob"));
// 运行并获取结果
let (result1, result2) = pool.run_until(future::join(task1, task2));
println!("{}", result1); // 输出: Hello, Alice!
println!("{}", result2); // 输出: Hello, Bob!
}
高级用法
任务优先级控制
use futures::future;
use futures_executor::{ThreadPool, ThreadPoolBuilder};
use futures::task::SpawnExt;
async fn high_priority_task() {
println!("High priority task running");
}
async fn low_priority_task() {
println!("Low priority task running");
}
fn main() {
// 创建带有自定义设置的线程池
let pool = ThreadPoolBuilder::new()
.pool_size(4)
.create()
.unwrap();
// 使用spawn_with_handle可以获取Future以便后续处理
let high_future = pool.spawn_with_handle(high_priority_task()).unwrap();
let low_future = pool.spawn_with_handle(low_priority_task()).unwrap();
// 等待任务完成
pool.run(future::join(high_future, low_future));
}
定时任务
use futures::future;
use futures_executor::LocalPool;
use futures_t timer::Delay;
use std::time::Duration;
async fn delayed_task() {
println!("Task started");
Delay::new(Duration::from_secs(2)).await;
println!("Task completed after delay");
}
fn main() {
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let task = spawner.spawn_local(delayed_task());
println!("Starting task...");
pool.run_until(task);
println!("All tasks completed");
}
完整示例代码
下面是一个结合了ThreadPool和LocalPool的完整示例,展示如何在实际项目中管理不同类型的异步任务:
use futures::future;
use futures_executor::{ThreadPool, LocalPool, ThreadPoolBuilder};
use futures_timer::Delay;
use std::time::Duration;
// CPU密集型任务
async fn heavy_computation(n: u32) -> u32 {
println!("Starting computation for {}", n);
let mut result = 1;
for i in 1..=n {
result *= i;
}
println!("Completed computation for {}", n);
result
}
// I/O密集型任务
async fn simulate_io_task(id: u32) -> String {
println!("IO task {} started", id);
Delay::new(Duration::from_secs(1)).await;
println!("IO task {} completed", id);
format!("Result from IO task {}", id)
}
fn main() {
// 创建线程池处理CPU密集型任务
let cpu_pool = ThreadPoolBuilder::new()
.pool_size(2) // 限制并发线程数
.create()
.unwrap();
// 创建本地执行器处理I/O任务
let mut io_pool = LocalPool::new();
let io_spawner = io_pool.spawner();
// 提交CPU密集型任务
let cpu_task1 = cpu_pool.spawn_with_handle(heavy_computation(10)).unwrap();
let cpu_task2 = cpu_pool.spawn_with_handle(heavy_computation(15)).unwrap();
// 提交I/O密集型任务
let io_task1 = io_spawner.spawn_local(simulate_io_task(1));
let io_task2 = io_spawner.spawn_local(simulate_io_task(2));
// 等待所有任务完成
let cpu_results = cpu_pool.run(future::join(cpu_task1, cpu_task2));
let io_results = io_pool.run_until(future::join(io_task1, io_task2));
println!("CPU results: {:?}", cpu_results);
println!("IO results: {:?}", io_results);
}
注意事项
futures-executor-preview
是预览版API,未来可能会有变化- 对于CPU密集型任务,
ThreadPool
通常更高效 - 对于I/O密集型或需要确定性的任务,
LocalPool
可能是更好的选择 - 确保正确处理任务中的错误,避免静默失败
- 注意任务之间的依赖关系,避免死锁
通过合理使用futures-executor-preview
,可以有效地管理和调度异步任务,提高程序的并发性能。