Rust异步任务执行器futures-executor-preview的使用:高效管理Future并发与调度

Rust异步任务执行器futures-executor-preview的使用:高效管理Future并发与调度

安装

在项目目录中运行以下Cargo命令:

cargo add futures-executor-preview

或者在Cargo.toml中添加以下行:

futures-executor-preview = "0.2.2"

基本用法

futures-executor-preview提供了几种执行Future的方式:

use futures::executor::block_on;
use futures::future::{join_all, ready};

async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    // 使用block_on同步执行一个Future
    block_on(hello_world());
    
    // 并发执行多个Future
    let futures = vec![ready(1), ready(2), ready(3)];
    let results = block_on(join_all(futures));
    println!("{:?}", results); // 输出: [1, 2, 3]
}

完整示例

下面是一个更完整的示例,展示如何使用线程池执行器来并发执行多个异步任务:

use futures::executor::{ThreadPool, block_on};
use futures::future::{join_all, lazy};
use std::time::Duration;

// 模拟异步任务
async fn async_task(id: u32, duration: u64) -> u32 {
    println!("Task {} started", id);
    futures::executor::block_on(lazy(|_| {
        std::thread::sleep(Duration::from_millis(duration));
    }));
    println!("Task {} completed", id);
    id
}

fn main() {
    // 创建线程池执行器
    let pool = ThreadPool::new().expect("Failed to build thread pool");
    
    // 创建多个异步任务
    let tasks = vec![
        pool.spawn_with_handle(async_task(1, 100)),
        pool.spawn_with_handle(async_task(2, 200)),
        pool.spawn_with_handle(async_task(3, 50)),
    ];
    
    // 等待所有任务完成
    let results = block_on(join_all(tasks));
    println!("All tasks completed: {:?}", results);
}

示例说明

  1. block_on - 同步阻塞当前线程直到Future完成
  2. ThreadPool - 创建一个线程池来并发执行多个Future
  3. spawn_with_handle - 将Future提交到线程池执行并返回Handle
  4. join_all - 等待多个Future全部完成

特性

  • 轻量级的Future执行器
  • 提供线程池实现
  • 支持本地和全局执行器
  • 与标准库Future兼容

1 回复

Rust异步任务执行器futures-executor-preview的使用:高效管理Future并发与调度

介绍

futures-executor-preview是Rust中用于执行和管理Future的轻量级执行器,属于futures库的一部分。它提供了一种简单的方式来运行异步任务,特别适合需要高效并发和调度的场景。

这个执行器主要提供了两种执行方式:

  1. ThreadPool - 线程池执行器,适合CPU密集型任务
  2. LocalPool - 单线程执行器,适合I/O密集型任务或需要确定性的场景

使用方法

添加依赖

首先在Cargo.toml中添加依赖:

[dependencies]
futures-executor-preview = "0.3.0-alpha.19"

基本示例

使用ThreadPool

use futures::future;
use futures_executor::ThreadPool;

async fn compute_square(x: i32) -> i32 {
    x * x
}

fn main() {
    // 创建线程池
    let pool = ThreadPool::new().expect("Failed to build thread pool");
    
    // 创建一组异步任务
    let futures = vec![
        pool.spawn_with_handle(compute_square(2)),
        pool.spawn_with_handle(compute_square(3)),
        pool.spawn_with_handle(compute_square(4)),
    ];
    
    // 等待所有任务完成并收集结果
    let results = pool.run(future::join_all(futures));
    
    println!("Results: {:?}", results); // 输出: Results: [4, 9, 16]
}

使用LocalPool

use futures::future;
use futures_executor::LocalPool;

async fn greet(name: &str) -> String {
    format!("Hello, {}!", name)
}

fn main() {
    // 创建本地执行器
    let mut pool = LocalPool::new();
    let spawner = pool.spawner();
    
    // 创建异步任务
    let task1 = spawner.spawn_local(greet("Alice"));
    let task2 = spawner.spawn_local(greet("Bob"));
    
    // 运行并获取结果
    let (result1, result2) = pool.run_until(future::join(task1, task2));
    
    println!("{}", result1); // 输出: Hello, Alice!
    println!("{}", result2); // 输出: Hello, Bob!
}

高级用法

任务优先级控制

use futures::future;
use futures_executor::{ThreadPool, ThreadPoolBuilder};
use futures::task::SpawnExt;

async fn high_priority_task() {
    println!("High priority task running");
}

async fn low_priority_task() {
    println!("Low priority task running");
}

fn main() {
    // 创建带有自定义设置的线程池
    let pool = ThreadPoolBuilder::new()
        .pool_size(4)
        .create()
        .unwrap();
    
    // 使用spawn_with_handle可以获取Future以便后续处理
    let high_future = pool.spawn_with_handle(high_priority_task()).unwrap();
    let low_future = pool.spawn_with_handle(low_priority_task()).unwrap();
    
    // 等待任务完成
    pool.run(future::join(high_future, low_future));
}

定时任务

use futures::future;
use futures_executor::LocalPool;
use futures_t timer::Delay;
use std::time::Duration;

async fn delayed_task() {
    println!("Task started");
    Delay::new(Duration::from_secs(2)).await;
    println!("Task completed after delay");
}

fn main() {
    let mut pool = LocalPool::new();
    let spawner = pool.spawner();
    
    let task = spawner.spawn_local(delayed_task());
    
    println!("Starting task...");
    pool.run_until(task);
    println!("All tasks completed");
}

完整示例代码

下面是一个结合了ThreadPool和LocalPool的完整示例,展示如何在实际项目中管理不同类型的异步任务:

use futures::future;
use futures_executor::{ThreadPool, LocalPool, ThreadPoolBuilder};
use futures_timer::Delay;
use std::time::Duration;

// CPU密集型任务
async fn heavy_computation(n: u32) -> u32 {
    println!("Starting computation for {}", n);
    let mut result = 1;
    for i in 1..=n {
        result *= i;
    }
    println!("Completed computation for {}", n);
    result
}

// I/O密集型任务
async fn simulate_io_task(id: u32) -> String {
    println!("IO task {} started", id);
    Delay::new(Duration::from_secs(1)).await;
    println!("IO task {} completed", id);
    format!("Result from IO task {}", id)
}

fn main() {
    // 创建线程池处理CPU密集型任务
    let cpu_pool = ThreadPoolBuilder::new()
        .pool_size(2)  // 限制并发线程数
        .create()
        .unwrap();
    
    // 创建本地执行器处理I/O任务
    let mut io_pool = LocalPool::new();
    let io_spawner = io_pool.spawner();
    
    // 提交CPU密集型任务
    let cpu_task1 = cpu_pool.spawn_with_handle(heavy_computation(10)).unwrap();
    let cpu_task2 = cpu_pool.spawn_with_handle(heavy_computation(15)).unwrap();
    
    // 提交I/O密集型任务
    let io_task1 = io_spawner.spawn_local(simulate_io_task(1));
    let io_task2 = io_spawner.spawn_local(simulate_io_task(2));
    
    // 等待所有任务完成
    let cpu_results = cpu_pool.run(future::join(cpu_task1, cpu_task2));
    let io_results = io_pool.run_until(future::join(io_task1, io_task2));
    
    println!("CPU results: {:?}", cpu_results);
    println!("IO results: {:?}", io_results);
}

注意事项

  1. futures-executor-preview是预览版API,未来可能会有变化
  2. 对于CPU密集型任务,ThreadPool通常更高效
  3. 对于I/O密集型或需要确定性的任务,LocalPool可能是更好的选择
  4. 确保正确处理任务中的错误,避免静默失败
  5. 注意任务之间的依赖关系,避免死锁

通过合理使用futures-executor-preview,可以有效地管理和调度异步任务,提高程序的并发性能。

回到顶部