Rust调度任务管理库solana-unified-scheduler-pool的使用,高效统一的任务调度与线程池管理

以下是关于Rust调度任务管理库solana-unified-scheduler-pool的使用内容:


Rust调度任务管理库solana-unified-scheduler-pool的使用,高效统一的任务调度与线程池管理

安装

在项目目录中运行以下Cargo命令:

cargo add solana-unified-scheduler-pool

或者在Cargo.toml中添加:

solana-unified-scheduler-pool = "2.3.6"

基本使用示例

use solana_unified_scheduler_pool::{
    SchedulerPool, Task, TaskResult, TaskError
};
use std::sync::Arc;
use std::time::Duration;

// 定义一个简单的任务
fn simple_task(input: usize) -> TaskResult<usize, TaskError> {
    println!("Executing task with input: {}", input);
    Ok(input * 2)
}

fn main() {
    // 创建调度器线程池,默认使用CPU核心数作为线程数
    let scheduler = Arc::new(SchedulerPool::default());
    
    // 提交任务
    let task = Task::new(simple_task, 42);
    let future = scheduler.submit(task);
    
    // 等待任务完成并获取结果
    match futures::executor::block_on(future) {
        Ok(result) => println!("Task result: {}", result),
        Err(e) => eprintln!("Task failed: {:?}", e),
    }
    
    // 关闭调度器
    scheduler.shutdown(Some(Duration::from_secs(1)));
}

完整示例Demo

use solana_unified_scheduler_pool::{
    SchedulerPool, Task, TaskResult, TaskError, TaskPriority
};
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::future::join_all;

// 定义不同的任务类型
fn fast_task(input: usize) -> TaskResult<usize, TaskError> {
    std::thread::sleep(Duration::from_millis(50));
    Ok(input * 2)
}

fn slow_task(input: usize) -> TaskResult<usize, TaskError> {
    std::thread::sleep(Duration::from_millis(500));
    Ok(input + 100)
}

#[tokio::main]
async fn main() {
    // 创建自定义配置的调度器
    let scheduler = Arc::new(SchedulerPool::builder()
        .num_threads(4)  // 设置4个工作线程
        .queue_capacity(100)  // 任务队列容量
        .build());
    
    let start_time = Instant::now();
    
    // 提交多个不同优先级的任务
    let mut futures = vec![];
    for i in 0..10 {
        let priority = if i % 2 == 0 {
            TaskPriority::High
        } else {
            TaskPriority::Normal
        };
        
        let task = if i < 5 {
            Task::with_priority(fast_task, i, priority)
        } else {
            Task::with_priority(slow_task, i, priority)
        };
        
        futures.push(scheduler.submit(task));
    }
    
    // 等待所有任务完成
    let results = join_all(futures).await;
    
    // 处理结果
    for (i, result) in results.iter().enumerate() {
        match result {
            Ok(val) => println!("Task {} succeeded with: {}", i, val),
            Err(e) => eprintln!("Task {} failed: {:?}", i, e),
        }
    }
    
    println!("All tasks completed in {:?}", start_time.elapsed());
    
    // 优雅关闭
    scheduler.shutdown(Some(Duration::from_secs(5)));
}

主要特性

  1. 统一任务调度:支持同步和异步任务
  2. 优先级队列:可以设置任务优先级
  3. 资源管理:可配置线程池大小和队列容量
  4. 优雅关闭:支持超时控制的关闭机制

注意:以上示例展示了该库的基本和高级用法,包括任务提交、优先级设置和批量处理。实际使用时请根据项目需求调整配置参数。


1 回复

Rust调度任务管理库solana-unified-scheduler-pool的使用

概述

solana-unified-scheduler-pool 是一个高效统一的任务调度与线程池管理库,最初为Solana区块链项目开发,现可作为通用Rust任务调度解决方案。它提供了灵活的任务调度能力和高效的线程池管理,特别适合需要高吞吐量任务处理的场景。

主要特性

  • 统一的任务调度接口
  • 高效的线程池管理
  • 支持任务优先级
  • 低开销的任务分发
  • 可配置的工作线程数量

使用方法

添加依赖

首先在Cargo.toml中添加依赖:

[dependencies]
solana-unified-scheduler-pool = "1.0"

基本使用示例

use solana_unified_scheduler_pool::{SchedulerPool, Task};

fn main() {
    // 创建调度器池,4个工作线程
    let scheduler_pool = SchedulerPool::new(4);
    
    // 定义一个任务
    let task = Task::new(|| {
        println!("Task executed on thread {:?}", std::thread::current().id());
    });
    
    // 提交任务到调度池
    scheduler_pool.schedule(task);
    
    // 等待所有任务完成
    scheduler_pool.join();
}

带优先级的任务调度

use solana_unified_scheduler_pool::{SchedulerPool, Task, Priority};

fn main() {
    let scheduler_pool = SchedulerPool::new(4);
    
    // 创建不同优先级的任务
    let high_priority_task = Task::with_priority(
        || println!("High priority task"),
        Priority::High
    );
    
    let normal_task = Task::new(|| println!("Normal task"));
    
    // 提交任务
    scheduler_pool.schedule(high_priority_task);
    scheduler_pool.schedule(normal_task);
    
    scheduler_pool.join();
}

批量任务提交

use solana_unified_scheduler_pool::{SchedulerPool, Task};

fn main() {
    let scheduler_pool = SchedulerPool::new(4);
    
    let tasks: Vec<_> = (0..10)
        .map(|i| Task::new(move || println!("Task {}", i)))
        .collect();
    
    // 批量提交任务
    scheduler_pool.schedule_batch(tasks);
    
    scheduler_pool.join();
}

高级配置

自定义线程池配置

use solana_unified_scheduler_pool::{SchedulerPool, SchedulerConfig};

fn main() {
    let config = SchedulerConfig {
        num_threads: 8,               // 8个工作线程
        thread_stack_size: Some(2 * 1024 * 1024), // 2MB栈大小
        ..Default::default()
    };
    
    let scheduler_pool = SchedulerPool::with_config(config);
    
    // ...使用调度池...
}

任务结果收集

use solana_unified_scheduler_pool::{SchedulerPool, Task};

fn main() {
    let scheduler_pool = SchedulerPool::new(4);
    
    let task = Task::with_result(|| {
        // 模拟耗时计算
        std::thread::sleep(std::time::Duration::from_millis(100));
        42 // 返回结果
    });
    
    let result_handle = task.result_handle();
    scheduler_pool.schedule(task);
    
    // 获取任务结果(会阻塞直到任务完成)
    let result = result_handle.get_result();
    println!("Task result: {}", result);
    
    scheduler_pool.join();
}

性能提示

  1. 对于大量小任务,使用schedule_batch比单个提交更高效
  2. 根据工作负载特点调整线程池大小
  3. 优先级任务应谨慎使用,过多高优先级任务可能导致低优先级任务饥饿

注意事项

  • 任务不应长时间阻塞线程,否则会影响整体调度性能
  • 确保所有任务都能在合理时间内完成,否则join可能会长时间阻塞
  • 任务闭包捕获的变量需要满足Send trait要求

完整示例

以下是一个综合使用solana-unified-scheduler-pool的完整示例:

use solana_unified_scheduler_pool::{SchedulerPool, Task, Priority, SchedulerConfig};
use std::thread;
use std::time::Duration;

fn main() {
    // 创建自定义配置的线程池
    let config = SchedulerConfig {
        num_threads: 4,
        thread_stack_size: Some(1024 * 1024), // 1MB栈大小
        ..Default::default()
    };
    let scheduler_pool = SchedulerPool::with_config(config);

    // 创建普通任务
    let normal_task = Task::new(|| {
        println!("Normal task started");
        thread::sleep(Duration::from_millis(50));
        println!("Normal task completed");
    });

    // 创建高优先级任务
    let high_priority_task = Task::with_priority(
        || {
            println!("High priority task started");
            thread::sleep(Duration::from_millis(20));
            println!("High priority task completed");
        },
        Priority::High,
    );

    // 创建带返回值的任务
    let result_task = Task::with_result(|| {
        println!("Result task started");
        thread::sleep(Duration::from_millis(30));
        42 // 返回值
    });
    let result_handle = result_task.result_handle();

    // 批量创建任务
    let batch_tasks: Vec<_> = (0..5)
        .map(|i| {
            Task::new(move || {
                println!("Batch task {} started", i);
                thread::sleep(Duration::from_millis(10 * (i + 1)));
                println!("Batch task {} completed", i);
            })
        })
        .collect();

    // 提交所有任务
    scheduler_pool.schedule(normal_task);
    scheduler_pool.schedule(high_priority_task);
    scheduler_pool.schedule(result_task);
    scheduler_pool.schedule_batch(batch_tasks);

    // 获取结果任务的值
    let result = result_handle.get_result();
    println!("Got result from task: {}", result);

    // 等待所有任务完成
    scheduler_pool.join();
    println!("All tasks completed");
}

这个完整示例展示了:

  1. 自定义线程池配置
  2. 创建普通任务、高优先级任务和带返回值的任务
  3. 批量任务创建和提交
  4. 获取任务结果
  5. 等待所有任务完成

输出会根据任务执行顺序而有所不同,但高优先级任务通常会先执行。

回到顶部