Rust线程池库affinitypool的使用:高效管理CPU亲和性与任务调度的并发工具

Rust线程池库affinitypool的使用:高效管理CPU亲和性与任务调度的并发工具

affinitypool是一个用于在专用线程池上运行阻塞任务的线程池库。阻塞任务可以异步发送到线程池,任务将排队直到有工作线程空闲来处理任务。任务按FIFO顺序处理。

对于优化的工作负载,可以指定每个线程的亲和性,确保每个线程可以请求绑定到特定的CPU核心,从而允许更多的并行性,并为阻塞工作负载提供更好的性能保证。

原始来源

该代码主要受到threadpool库的启发,采用Apache License 2.0和MIT双重许可。

安装

在项目目录中运行以下Cargo命令:

cargo add affinitypool

或者在Cargo.toml中添加:

affinitypool = "0.3.1"

示例代码

以下是使用affinitypool的完整示例:

use affinitypool::AffinityPool;
use std::thread;
use std::time::Duration;

fn main() {
    // 创建一个包含4个线程的线程池
    let pool = AffinityPool::new(4);
    
    // 设置线程亲和性(将线程绑定到特定CPU核心)
    pool.set_affinity(&[0, 1, 2, 3]).expect("Failed to set thread affinity");
    
    // 提交任务到线程池
    for i in 0..8 {
        pool.execute(move || {
            println!("Task {} started on thread {:?}", i, thread::current().id());
            // 模拟阻塞任务
            thread::sleep(Duration::from_secs(1));
            println!("Task {} completed", i);
        });
    }
    
    // 等待所有任务完成
    pool.join();
    println!("All tasks completed");
}

代码说明

  1. AffinityPool::new(4) 创建一个包含4个工作线程的线程池。
  2. set_affinity(&[0, 1, 2, 3]) 将4个线程分别绑定到CPU核心0、1、2和3。
  3. execute() 方法用于提交任务到线程池,任务以闭包形式提供。
  4. join() 方法等待所有任务完成。

特性

  • 支持CPU亲和性设置,提高缓存命中率
  • 异步任务提交
  • FIFO任务调度
  • 阻塞任务专用线程池
  • 简单易用的API

affinitypool特别适合需要处理大量阻塞任务且对性能要求较高的应用场景,如网络服务器、数据库连接池等。

完整示例demo

以下是更完整的affinitypool使用示例,展示如何创建线程池、设置亲和性、提交不同类型任务并处理结果:

use affinitypool::AffinityPool;
use std::thread;
use std::time::Duration;
use std::sync::mpsc;

fn main() {
    // 创建一个包含CPU核心数两倍的线程池
    let num_cores = num_cpus::get();
    let pool = AffinityPool::new(num_cores * 2);
    
    // 设置线程亲和性,将线程均匀分配到所有CPU核心上
    let mut affinity = Vec::with_capacity(num_cores * 2);
    for i in 0..num_cores * 2 {
        affinity.push(i % num_cores);
    }
    pool.set_affinity(&affinity).expect("Failed to set thread affinity");

    // 创建一个通道用于接收任务结果
    let (tx, rx) = mpsc::channel();

    // 提交CPU密集型任务
    for i in 0..10 {
        let tx = tx.clone();
        pool.execute(move || {
            println!("CPU密集型任务 {} 开始于线程 {:?}", i, thread::current().id());
            
            // 模拟CPU密集型计算
            let mut sum = 0;
            for j in 0..1_000_000 {
                sum += j;
            }
            
            tx.send(format!("任务 {} 完成,结果: {}", i, sum)).unwrap();
        });
    }

    // 提交I/O密集型任务
    for i in 10..20 {
        let tx = tx.clone();
        pool.execute(move || {
            println!("I/O密集型任务 {} 开始于线程 {:?}", i, thread::current().id());
            
            // 模拟I/O等待
            thread::sleep(Duration::from_millis(500));
            
            tx.send(format!("任务 {} 完成", i)).unwrap();
        });
    }

    // 等待所有任务完成
    pool.join();
    
    // 收集并打印结果
    for _ in 0..20 {
        println!("{}", rx.recv().unwrap());
    }

    println!("所有任务完成");
}

完整示例说明

  1. 动态获取CPU核心数量创建线程池
  2. 将线程均匀分配到所有CPU核心上
  3. 使用通道(mpsc)收集任务结果
  4. 演示两种不同类型任务的处理:
    • CPU密集型任务(模拟计算)
    • I/O密集型任务(模拟等待)
  5. 使用join()确保所有任务完成
  6. 收集并打印所有任务结果

这个完整示例展示了affinitypool在实际应用中的典型用法,包括任务分发、结果收集和性能优化配置。


1 回复

Rust线程池库affinitypool的使用:高效管理CPU亲和性与任务调度的并发工具

介绍

affinitypool是一个Rust线程池库,专注于高效管理CPU亲和性和任务调度。它允许你将特定任务绑定到特定的CPU核心上运行,从而优化缓存利用率并减少线程迁移开销,特别适合高性能计算和低延迟应用场景。

主要特性

  • 可配置的线程池大小
  • 支持设置CPU亲和性
  • 任务优先级调度
  • 轻量级实现
  • 与标准库Future兼容

使用方法

基本使用

首先在Cargo.toml中添加依赖:

[dependencies]
affinitypool = "0.2"

创建线程池

use affinitypool::AffinityPool;

fn main() {
    // 创建4个工作线程的线程池
    let pool = AffinityPool::new(4).expect("Failed to create thread pool");
    
    // 提交任务
    let result = pool.run(|| {
        // 这里执行计算密集型任务
        42
    }).expect("Failed to submit task");
    
    println!("Task result: {}", result);
}

设置CPU亲和性

use affinitypool::AffinityPool;
use core_affinity::CoreId;

fn main() {
    // 获取可用的CPU核心
    let core_ids = core_affinity::get_core_ids().unwrap();
    
    // 创建线程池并设置亲和性
    let pool = AffinityPool::with_affinity(core_ids.len(), |thread_idx| {
        Some(core_ids[thread_idx % core_ids.len()])
    }).expect("Failed to create thread pool with affinity");
    
    // 提交多个任务
    let handles: Vec<_> = (0..10).map(|i| {
        pool.run(move || {
            println!("Task {} running on thread {:?}", i, std::thread::current().id());
            i * i
        }).unwrap()
    }).collect();
    
    // 等待所有任务完成
    for handle in handles {
        println!("Result: {}", handle.join().unwrap());
    }
}

使用异步任务

use affinitypool::AffinityPool;
use futures::future::join_all;

#[tokio::main]
async fn main() {
    let pool = AffinityPool::new(4).unwrap();
    
    let futures: Vec<_> = (0..10).map(|i| {
        pool.spawn_async(async move {
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            i * 2
        })
    }).collect();
    
    let results = join_all(futures).await;
    println!("Results: {:?}", results);
}

高级配置

use affinitypool::{AffinityPool, AffinityPoolBuilder};

fn main() {
    let pool = AffinityPoolBuilder::new()
        .num_threads(8)
        .thread_name("worker".to_string())
        .thread_stack_size(2 * 1024 * 1024) // 2MB栈大小
        .affinity(|thread_idx| {
            // 将线程均匀分配到所有核心上
            let core_ids = core_affinity::get_core_ids().unwrap();
            Some(core_ids[thread_idx % core_ids.len()])
        })
        .build()
        .unwrap();
    
    // 使用线程池...
}

完整示例demo

下面是一个完整的示例,展示了如何使用affinitypool进行CPU绑定的矩阵乘法计算:

use affinitypool::AffinityPool;
use core_affinity::CoreId;
use std::sync::Arc;
use std::time::Instant;

const SIZE: usize = 512;

// 矩阵类型
type Matrix = Vec<Vec<f64>>;

fn main() {
    // 初始化两个随机矩阵
    let a = generate_matrix(SIZE);
    let b = generate_matrix(SIZE);
    
    // 获取CPU核心信息
    let core_ids = core_affinity::get_core_ids().unwrap();
    println!("Available CPU cores: {}", core_ids.len());
    
    // 创建线程池,每个线程绑定到不同的核心
    let pool = AffinityPool::with_affinity(core_ids.len(), |thread_idx| {
        Some(core_ids[thread_idx % core_ids.len()])
    }).expect("Failed to create thread pool with affinity");
    
    // 将矩阵包装在Arc中以实现线程安全共享
    let a = Arc::new(a);
    let b = Arc::new(b);
    
    let start = Instant::now();
    
    // 分割工作负载
    let chunk_size = SIZE / core_ids.len();
    let mut handles = Vec::new();
    
    for i in 0..core_ids.len() {
        let a = Arc::clone(&a);
        let b = Arc::clone(&b);
        let start_row = i * chunk_size;
        let end_row = if i == core_ids.len() - 1 {
            SIZE
        } else {
            (i + 1) * chunk_size
        };
        
        // 提交任务到线程池
        handles.push(pool.run(move || {
            let mut result = vec![vec![0.0; SIZE]; end_row - start_row];
            for i in start_row..end_row {
                for j in 0..SIZE {
                    for k in 0..SIZE {
                        result[i - start_row][j] += a[i][k] * b[k][j];
                    }
                }
            }
            (start_row, end_row, result)
        }).unwrap());
    }
    
    // 收集结果
    let mut c = vec![vec![0.0; SIZE]; SIZE];
    for handle in handles {
        let (start, end, partial) = handle.join().unwrap();
        c[start..end].copy_from_slice(&partial);
    }
    
    let duration = start.elapsed();
    println!("Matrix multiplication took: {:?}", duration);
    println!("Sample result: {}", c[0][0]);
}

// 生成随机矩阵
fn generate_matrix(size: usize) -> Matrix {
    (0..size)
        .map(|_| {
            (0..size)
                .map(|_| rand::random::<f64>())
                .collect()
        })
        .collect()
}

性能建议

  1. 对于计算密集型任务,将线程数设置为物理核心数
  2. 对于I/O密集型任务,可以适当增加线程数
  3. 将相关任务绑定到同一个CPU核心上可以提高缓存命中率
  4. 避免频繁创建和销毁线程池

注意事项

  • 在Linux系统上需要适当的权限来设置CPU亲和性
  • 过度使用CPU亲和性可能会影响操作系统的调度优化
  • 在某些虚拟化环境中可能无法设置CPU亲和性

affinitypool为需要精细控制线程调度和CPU亲和性的应用提供了强大的工具,特别适合高性能计算、游戏服务器、金融交易等低延迟场景。

回到顶部