Rust异步I/O调度库compio-dispatcher的使用,高效管理并发任务与事件驱动的异步调度器

Rust异步I/O调度库compio-dispatcher的使用,高效管理并发任务与事件驱动的异步调度器

Compio Logo

Compio是一个基于线程每核心的Rust运行时,支持IOCP/io_uring/polling。名称来源于"completion-based IO"。这个库受到了monoio的启发。

为什么不是Tokio?

Tokio是一个优秀的通用异步运行时。然而,它是基于轮询的,甚至在Windows上使用了未公开的API。我们希望有一些新的高级API来执行IOCP/io_uring。

tokio-uring不同,这个运行时不是基于Tokio的。这主要是因为mio中没有公开的API来控制IOCP,而且tokiomio达到1.0之前不会暴露API来控制mio

快速开始

添加compio作为依赖:

compio = { version = "0.13.1", features = ["macros"] }

然后我们可以使用高级API来执行文件系统和网络IO。

use compio::{fs::File, io::AsyncReadAtExt};

#[compio::main]
async fn main() {
    let file = File::open("Cargo.toml").await.unwrap();
    let (read, buffer) = file.read_to_end_at(Vec::with_capacity(1024), 0).await.unwrap();
    assert_eq!(read, buffer.len());
    let buffer = String::from_utf8(buffer).unwrap();
    println!("{}", buffer);
}

完整示例:使用compio-dispatcher管理并发任务

以下是使用compio-dispatcher管理并发任务的完整示例:

use compio::{
    dispatcher::Dispatcher,
    fs::File,
    io::{AsyncReadAtExt, AsyncWriteAtExt},
};

#[compio::main]
async fn main() {
    // 创建一个调度器
    let dispatcher = Dispatcher::new();

    // 并发执行多个文件操作任务
    let task1 = dispatcher.spawn(async {
        let file = File::open("file1.txt").await.unwrap();
        let (read, buffer) = file.read_to_end_at(Vec::new(), 0).await.unwrap();
        (read, buffer)
    });

    let task2 = dispatcher.spawn(async {
        let file = File::create("file2.txt").await.unwrap();
        let written = file.write_at("Hello, Compio!".as_bytes(), 0).await.unwrap();
        written
    });

    // 等待所有任务完成
    let (result1, result2) = futures::join!(task1, task2);

    println!("Task 1 read {} bytes", result1.unwrap().0);
    println!("Task 2 wrote {} bytes", result2.unwrap());
}

这个示例展示了如何使用compio-dispatcher来并发执行多个文件操作任务。调度器会自动管理这些任务的执行,确保高效利用系统资源。

贡献

无论您是刚接触Rust还是经验丰富的专家,都可以为Compio做出贡献。如果您有任何关于Compio的问题,欢迎加入我们的Telegram群组。在贡献之前,请查看我们的贡献指南。

许可证

MIT


1 回复

Rust异步I/O调度库compio-dispatcher的使用:高效管理并发任务与事件驱动的异步调度器

完整示例demo

下面是一个综合使用compio-dispatcher各种功能的完整示例:

use compio_dispatcher::{Dispatcher, Event, Priority};
use std::time::Duration;

#[compio::main]
async fn main() {
    // 1. 创建自定义调度器
    let dispatcher = Dispatcher::builder()
        .worker_threads(4)  // 设置4个工作线程
        .build()
        .unwrap();
    
    // 2. 创建事件用于任务间通信
    let event = Event::new();
    
    // 3. 提交不同优先级的任务
    let high_prio_task = dispatcher.spawn_with_priority(async {
        println!("[High Priority] Task started");
        compio::time::sleep(Duration::from_millis(50)).await;
        println!("[High Priority] Task completed");
        100
    }, Priority::High);
    
    let normal_task = dispatcher.spawn(async {
        println!("[Normal Priority] Task started");
        compio::time::sleep(Duration::from_millis(100)).await;
        println!("[Normal Priority] Task completed");
        200
    });
    
    // 4. 事件监听任务
    let event_listener = dispatcher.spawn({
        let event = event.clone();
        async move {
            println!("[EventListener] Waiting for event...");
            event.wait().await;
            println!("[EventListener] Event received!");
            300
        }
    });
    
    // 5. 批量提交任务
    let batch_tasks: Vec<_> = (0..3).map(|i| {
        dispatcher.spawn(async move {
            println!("[Batch Task {}] Started", i);
            compio::time::sleep(Duration::from_millis(i * 50)).await;
            println!("[Batch Task {}] Completed", i);
            i * 100
        })
    }).collect();
    
    // 6. 触发事件的异步任务
    dispatcher.spawn(async {
        compio::time::sleep(Duration::from_secs(1)).await;
        println!("[Event Trigger] Notifying all listeners");
        event.notify();
    });
    
    // 7. 等待所有任务完成
    let (high_res, normal_res, event_res) = 
        futures::join!(high_prio_task, normal_task, event_listener);
    
    let batch_results = futures::future::join_all(batch_tasks).await;
    
    println!("\nFinal Results:");
    println!("High priority task result: {:?}", high_res.unwrap());
    println!("Normal priority task result: {:?}", normal_res.unwrap());
    println!("Event listener result: {:?}", event_res.unwrap());
    println!("Batch tasks results: {:?}", batch_results);
    
    // 8. 关闭调度器
    dispatcher.shutdown();
}

示例说明

  1. 创建了一个自定义配置的调度器,设置了4个工作线程
  2. 创建了一个Event用于任务间通信
  3. 提交了高优先级和普通优先级的任务各一个
  4. 创建了一个监听事件的任务
  5. 批量提交了3个任务
  6. 创建了一个延迟1秒后触发事件的任务
  7. 使用futures::join等待所有任务完成
  8. 最后关闭调度器

预期输出

输出顺序可能会因任务调度而有所不同,但高优先级任务通常会先执行:

[High Priority] Task started
[Normal Priority] Task started
[EventListener] Waiting for event...
[Batch Task 0] Started
[Batch Task 1] Started
[Batch Task 2] Started
[High Priority] Task completed
[Batch Task 0] Completed
[Normal Priority] Task completed
[Batch Task 1] Completed
[Batch Task 2] Completed
[Event Trigger] Notifying all listeners
[EventListener] Event received!

Final Results:
High priority task result: 100
Normal priority task result: 200
Event listener result: 300
Batch tasks results: [Ok(0), Ok(100), Ok(200)]

这个示例展示了compio-dispatcher的主要功能,包括任务调度、优先级控制、事件驱动和批量任务处理。

回到顶部