Rust异步任务分发库async-dispatcher的使用:高效管理并发与调度任务

Rust异步任务分发库async-dispatcher的使用:高效管理并发与调度任务

async-dispatcher是一个允许异步库在不绑定特定异步运行时的情况下生成任务和设置定时器的库。它的核心需求来自于能够使用原生操作系统调度器。

库的基本使用

库可以以通用方式生成任务:

use async_dispatcher::{spawn, sleep};

pub async fn my_library_function() {
    let task = spawn(async {
        sleep(Duration::from_secs(1)).await;
        println!("in a spawned task!");
    });

    // ...
}

应用程序控制任务分发

使用这些库的应用程序可以通过实现Dispatcher trait来控制如何分发工作:

use async_dispatcher::{set_dispatcher, Dispatcher, Runnable};

struct MyAppDispatcher;

impl Dispatcher for MyAppDispatcher {
    fn dispatch(&self, runnable: Runnable) {
        // 实现任务分发逻辑
    }

    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
        // 实现延迟任务分发逻辑
    }
}

fn main() {
    set_dispatcher(MyAppDispatcher);

    async_dispatcher::block_on(async move {
        my_library_function().await;
    });
}

完整示例

下面是一个完整的示例,展示如何使用async-dispatcher库:

use std::time::Duration;
use async_dispatcher::{spawn, sleep, set_dispatcher, Dispatcher, Runnable, block_on};

// 自定义分发器实现
struct MyAppDispatcher;

impl Dispatcher for MyAppDispatcher {
    fn dispatch(&self, runnable: Runnable) {
        println!("Dispatching task immediately");
        // 在实际应用中,这里会将任务提交到线程池或异步运行时
        tokio::spawn(async move {
            runnable.run().await;
        });
    }

    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
        println!("Scheduling task after {:?}", duration);
        // 在实际应用中,这里会设置一个定时器
        tokio::spawn(async move {
            tokio::time::sleep(duration).await;
            runnable.run().await;
        });
    }
}

// 示例库函数
async fn my_library_function() {
    println!("Library function started");
    
    // 立即生成任务
    let task1 = spawn(async {
        println!("Task 1 started");
        sleep(Duration::from_millis(500)).await;
        println!("Task 1 completed");
    });

    // 延迟生成任务
    let task2 = spawn(async {
        println!("Task 2 started after delay");
        sleep(Duration::from_millis(300)).await;
        println!("Task 2 completed");
    });

    // 等待任务完成
    task1.await;
    task2.await;
    
    println!("Library function completed");
}

#[tokio::main]
async fn main() {
    // 设置自定义分发器
    set_dispatcher(MyAppDispatcher);
    
    // 运行库函数
    block_on(async {
        my_library_function().await;
    }).await;
}

安装

要使用async-dispatcher,可以运行以下Cargo命令:

cargo add async-dispatcher

或者在Cargo.toml中添加:

async-dispatcher = "0.1.2"

这个库的主要优势在于它允许库作者编写异步代码而不绑定特定的运行时,同时让应用程序开发者可以灵活地选择如何执行这些异步任务。

完整示例demo

基于上述内容,下面是一个更完整的async-dispatcher使用示例:

use std::time::Duration;
use async_dispatcher::{
    spawn, sleep, set_dispatcher, 
    Dispatcher, Runnable, block_on
};

// 自定义分发器实现
struct TokioDispatcher;

impl Dispatcher for TokioDispatcher {
    fn dispatch(&self, runnable: Runnable) {
        // 使用tokio运行时立即执行任务
        tokio::spawn(async move {
            println!("[Dispatcher] Running task immediately");
            runnable.run().await;
        });
    }

    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
        // 使用tokio定时器延迟执行任务
        tokio::spawn(async move {
            println!("[Dispatcher] Scheduling task after {:?}", duration);
            tokio::time::sleep(duration).await;
            runnable.run().await;
        });
    }
}

// 模拟库函数
async fn network_operations() {
    println!("Starting network operations...");
    
    // 并发发起多个网络请求
    let req1 = spawn(async {
        println!("[Request 1] Started");
        sleep(Duration::from_millis(800)).await;
        println!("[Request 1] Completed");
    });

    let req2 = spawn(async {
        println!("[Request 2] Started");
        sleep(Duration::from_millis(500)).await;
        println!("[Request 2] Completed");
    });

    // 定时任务
    let delayed_task = spawn(async {
        println!("[Delayed Task] Started after delay");
        sleep(Duration::from_millis(300)).await;
        println!("[Delayed Task] Completed");
    });

    // 等待所有任务完成
    req1.await;
    req2.await;
    delayed_task.await;
    
    println!("All network operations completed!");
}

#[tokio::main]
async fn main() {
    // 设置tokio分发器
    set_dispatcher(TokioDispatcher);
    
    println!("Application started");
    
    // 执行库函数
    block_on(async {
        network_operations().await;
    }).await;
    
    println!("Application finished");
}

这个示例展示了:

  1. 自定义TokioDispatcher实现
  2. 模拟网络请求的并发执行
  3. 延迟任务的调度
  4. 完整的async-dispatcher工作流程

要运行这个示例,需要在Cargo.toml中添加:

[dependencies]
async-dispatcher = "0.1.2"
tokio = { version = "1.0", features = ["full"] }

1 回复

Rust异步任务分发库async-dispatcher使用指南

介绍

async-dispatcher是一个用于高效管理并发和调度异步任务的Rust库。它提供了一个轻量级的任务分发系统,特别适合需要处理大量短期异步任务的场景。

该库的主要特点包括:

  • 轻量级任务调度
  • 高效的并发管理
  • 简单的API接口
  • 与Rust的async/await语法无缝集成

使用方法

添加依赖

首先在Cargo.toml中添加依赖:

[dependencies]
async-dispatcher = "0.3"
tokio = { version = "1.0", features = ["full"] }

基本使用示例

use async_dispatcher::Dispatcher;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 创建一个最大并发数为4的分发器
    let dispatcher = Dispatcher::new(4);
    
    // 提交5个任务
    for i in 0..5 {
        dispatcher.dispatch(async move {
            println!("任务 {} 开始执行", i);
            tokio::time::sleep(Duration::from_secs(1)).await;
            println!("任务 {} 完成", i);
        }).await;
    }
    
    // 等待所有任务完成
    dispatcher.join().await;
}

带返回值的任务

use async_dispatcher::Dispatcher;

#[tokio::main]
async fn main() {
    let dispatcher = Dispatcher::new(2);
    
    let handles: Vec<_> = (0..3)
        .map(|i| {
            dispatcher.dispatch_with_result(async move {
                i * 2
            })
        })
        .collect();
    
    for handle in handles {
        let result = handle.await.unwrap();
        println!("任务结果: {}", result);
    }
    
    dispatcher.join().await;
}

动态调整并发度

use async_dispatcher::Dispatcher;

#[tokio::main]
async fn main() {
    let dispatcher = Dispatcher::new(2);
    
    // 运行时调整并发度
    dispatcher.set_concurrency(4).await;
    
    // 提交任务...
}

高级用法

任务优先级

use async_dispatcher::{Dispatcher, Priority};

#[tokio::main]
async fn main() {
    let dispatcher = Dispatcher::new(2);
    
    dispatcher.dispatch_with_priority(
        async {
            println!("高优先级任务");
        },
        Priority::High
    ).await;
    
    dispatcher.dispatch_with_priority(
        async {
            println!("低优先级任务");
        },
        Priority::Low
    ).await;
    
    dispatcher.join().await;
}

错误处理

use async_dispatcher::Dispatcher;

#[tokio::main]
async fn main() {
    let dispatcher = Dispatcher::new(2);
    
    let handle = dispatcher.dispatch_with_result(async {
        if some_condition {
            Ok(42)
        } else {
            Err("发生了错误")
        }
    }).await;
    
    match handle.await {
        Ok(result) => println!("成功: {}", result),
        Err(e) => println!("错误: {}", e),
    }
    
    dispatcher.join().await;
}

性能建议

  1. 根据任务类型选择合适的并发度:

    • CPU密集型任务:建议设置为CPU核心数
    • IO密集型任务:可以设置更高的并发度
  2. 对于大量短时任务,可以预先创建分发器并复用

  3. 考虑使用优先级功能来确保关键任务优先执行

完整示例demo

下面是一个结合了基本使用、带返回值任务、优先级和错误处理的完整示例:

use async_dispatcher::{Dispatcher, Priority};
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 创建并发度为3的分发器
    let dispatcher = Dispatcher::new(3);
    
    // 基本任务示例
    for i in 0..3 {
        dispatcher.dispatch(async move {
            println!("基础任务 {} 开始", i);
            tokio::time::sleep(Duration::from_millis(500)).await;
            println!("基础任务 {} 结束", i);
        }).await;
    }
    
    // 带返回值的任务
    let result_handles: Vec<_> = (0..2)
        .map(|i| {
            dispatcher.dispatch_with_result(async move {
                // 模拟可能失败的任务
                if i % 2 == 0 {
                    Ok(format!("结果 {}", i * 10))
                } else {
                    Err(format!("任务 {} 失败", i))
                }
            })
        })
        .collect();
    
    // 带优先级的任务
    dispatcher.dispatch_with_priority(
        async {
            println!("!!!紧急任务开始执行");
            tokio::time::sleep(Duration::from_millis(200)).await;
            println!("!!!紧急任务完成");
        },
        Priority::High
    ).await;
    
    // 动态调整并发度
    dispatcher.set_concurrency(5).await;
    println("已将并发度调整为5");
    
    // 处理带返回值的任务结果
    for handle in result_handles {
        match handle.await {
            Ok(res) => println!("任务成功: {}", res),
            Err(e) => println!("任务失败: {}", e),
        }
    }
    
    // 等待所有任务完成
    dispatcher.join().await;
    println!("所有任务已完成");
}

这个完整示例展示了:

  1. 基本异步任务调度
  2. 带返回值和错误处理的任务
  3. 任务优先级的使用
  4. 运行时动态调整并发度
  5. 等待所有任务完成的正确方式

您可以根据实际需求调整并发度和任务类型,以充分利用async-dispatcher的功能。

回到顶部