Rust线程池库affinitypool的使用:高效管理CPU亲和性与任务调度的并发工具
Rust线程池库affinitypool的使用:高效管理CPU亲和性与任务调度的并发工具
affinitypool是一个用于在专用线程池上运行阻塞任务的线程池库。阻塞任务可以异步发送到线程池,任务将排队直到有工作线程空闲来处理任务。任务按FIFO顺序处理。
对于优化的工作负载,可以指定每个线程的亲和性,确保每个线程可以请求绑定到特定的CPU核心,从而允许更多的并行性,并为阻塞工作负载提供更好的性能保证。
原始来源
该代码主要受到threadpool库的启发,采用Apache License 2.0和MIT双重许可。
安装
在项目目录中运行以下Cargo命令:
cargo add affinitypool
或者在Cargo.toml中添加:
affinitypool = "0.3.1"
示例代码
以下是使用affinitypool的完整示例:
use affinitypool::AffinityPool;
use std::thread;
use std::time::Duration;
fn main() {
// 创建一个包含4个线程的线程池
let pool = AffinityPool::new(4);
// 设置线程亲和性(将线程绑定到特定CPU核心)
pool.set_affinity(&[0, 1, 2, 3]).expect("Failed to set thread affinity");
// 提交任务到线程池
for i in 0..8 {
pool.execute(move || {
println!("Task {} started on thread {:?}", i, thread::current().id());
// 模拟阻塞任务
thread::sleep(Duration::from_secs(1));
println!("Task {} completed", i);
});
}
// 等待所有任务完成
pool.join();
println!("All tasks completed");
}
代码说明
AffinityPool::new(4)
创建一个包含4个工作线程的线程池。set_affinity(&[0, 1, 2, 3])
将4个线程分别绑定到CPU核心0、1、2和3。execute()
方法用于提交任务到线程池,任务以闭包形式提供。join()
方法等待所有任务完成。
特性
- 支持CPU亲和性设置,提高缓存命中率
- 异步任务提交
- FIFO任务调度
- 阻塞任务专用线程池
- 简单易用的API
affinitypool特别适合需要处理大量阻塞任务且对性能要求较高的应用场景,如网络服务器、数据库连接池等。
完整示例demo
以下是更完整的affinitypool使用示例,展示如何创建线程池、设置亲和性、提交不同类型任务并处理结果:
use affinitypool::AffinityPool;
use std::thread;
use std::time::Duration;
use std::sync::mpsc;
fn main() {
// 创建一个包含CPU核心数两倍的线程池
let num_cores = num_cpus::get();
let pool = AffinityPool::new(num_cores * 2);
// 设置线程亲和性,将线程均匀分配到所有CPU核心上
let mut affinity = Vec::with_capacity(num_cores * 2);
for i in 0..num_cores * 2 {
affinity.push(i % num_cores);
}
pool.set_affinity(&affinity).expect("Failed to set thread affinity");
// 创建一个通道用于接收任务结果
let (tx, rx) = mpsc::channel();
// 提交CPU密集型任务
for i in 0..10 {
let tx = tx.clone();
pool.execute(move || {
println!("CPU密集型任务 {} 开始于线程 {:?}", i, thread::current().id());
// 模拟CPU密集型计算
let mut sum = 0;
for j in 0..1_000_000 {
sum += j;
}
tx.send(format!("任务 {} 完成,结果: {}", i, sum)).unwrap();
});
}
// 提交I/O密集型任务
for i in 10..20 {
let tx = tx.clone();
pool.execute(move || {
println!("I/O密集型任务 {} 开始于线程 {:?}", i, thread::current().id());
// 模拟I/O等待
thread::sleep(Duration::from_millis(500));
tx.send(format!("任务 {} 完成", i)).unwrap();
});
}
// 等待所有任务完成
pool.join();
// 收集并打印结果
for _ in 0..20 {
println!("{}", rx.recv().unwrap());
}
println!("所有任务完成");
}
完整示例说明
- 动态获取CPU核心数量创建线程池
- 将线程均匀分配到所有CPU核心上
- 使用通道(mpsc)收集任务结果
- 演示两种不同类型任务的处理:
- CPU密集型任务(模拟计算)
- I/O密集型任务(模拟等待)
- 使用
join()
确保所有任务完成 - 收集并打印所有任务结果
这个完整示例展示了affinitypool在实际应用中的典型用法,包括任务分发、结果收集和性能优化配置。
Rust线程池库affinitypool的使用:高效管理CPU亲和性与任务调度的并发工具
介绍
affinitypool是一个Rust线程池库,专注于高效管理CPU亲和性和任务调度。它允许你将特定任务绑定到特定的CPU核心上运行,从而优化缓存利用率并减少线程迁移开销,特别适合高性能计算和低延迟应用场景。
主要特性
- 可配置的线程池大小
- 支持设置CPU亲和性
- 任务优先级调度
- 轻量级实现
- 与标准库Future兼容
使用方法
基本使用
首先在Cargo.toml中添加依赖:
[dependencies]
affinitypool = "0.2"
创建线程池
use affinitypool::AffinityPool;
fn main() {
// 创建4个工作线程的线程池
let pool = AffinityPool::new(4).expect("Failed to create thread pool");
// 提交任务
let result = pool.run(|| {
// 这里执行计算密集型任务
42
}).expect("Failed to submit task");
println!("Task result: {}", result);
}
设置CPU亲和性
use affinitypool::AffinityPool;
use core_affinity::CoreId;
fn main() {
// 获取可用的CPU核心
let core_ids = core_affinity::get_core_ids().unwrap();
// 创建线程池并设置亲和性
let pool = AffinityPool::with_affinity(core_ids.len(), |thread_idx| {
Some(core_ids[thread_idx % core_ids.len()])
}).expect("Failed to create thread pool with affinity");
// 提交多个任务
let handles: Vec<_> = (0..10).map(|i| {
pool.run(move || {
println!("Task {} running on thread {:?}", i, std::thread::current().id());
i * i
}).unwrap()
}).collect();
// 等待所有任务完成
for handle in handles {
println!("Result: {}", handle.join().unwrap());
}
}
使用异步任务
use affinitypool::AffinityPool;
use futures::future::join_all;
#[tokio::main]
async fn main() {
let pool = AffinityPool::new(4).unwrap();
let futures: Vec<_> = (0..10).map(|i| {
pool.spawn_async(async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
i * 2
})
}).collect();
let results = join_all(futures).await;
println!("Results: {:?}", results);
}
高级配置
use affinitypool::{AffinityPool, AffinityPoolBuilder};
fn main() {
let pool = AffinityPoolBuilder::new()
.num_threads(8)
.thread_name("worker".to_string())
.thread_stack_size(2 * 1024 * 1024) // 2MB栈大小
.affinity(|thread_idx| {
// 将线程均匀分配到所有核心上
let core_ids = core_affinity::get_core_ids().unwrap();
Some(core_ids[thread_idx % core_ids.len()])
})
.build()
.unwrap();
// 使用线程池...
}
完整示例demo
下面是一个完整的示例,展示了如何使用affinitypool进行CPU绑定的矩阵乘法计算:
use affinitypool::AffinityPool;
use core_affinity::CoreId;
use std::sync::Arc;
use std::time::Instant;
const SIZE: usize = 512;
// 矩阵类型
type Matrix = Vec<Vec<f64>>;
fn main() {
// 初始化两个随机矩阵
let a = generate_matrix(SIZE);
let b = generate_matrix(SIZE);
// 获取CPU核心信息
let core_ids = core_affinity::get_core_ids().unwrap();
println!("Available CPU cores: {}", core_ids.len());
// 创建线程池,每个线程绑定到不同的核心
let pool = AffinityPool::with_affinity(core_ids.len(), |thread_idx| {
Some(core_ids[thread_idx % core_ids.len()])
}).expect("Failed to create thread pool with affinity");
// 将矩阵包装在Arc中以实现线程安全共享
let a = Arc::new(a);
let b = Arc::new(b);
let start = Instant::now();
// 分割工作负载
let chunk_size = SIZE / core_ids.len();
let mut handles = Vec::new();
for i in 0..core_ids.len() {
let a = Arc::clone(&a);
let b = Arc::clone(&b);
let start_row = i * chunk_size;
let end_row = if i == core_ids.len() - 1 {
SIZE
} else {
(i + 1) * chunk_size
};
// 提交任务到线程池
handles.push(pool.run(move || {
let mut result = vec![vec![0.0; SIZE]; end_row - start_row];
for i in start_row..end_row {
for j in 0..SIZE {
for k in 0..SIZE {
result[i - start_row][j] += a[i][k] * b[k][j];
}
}
}
(start_row, end_row, result)
}).unwrap());
}
// 收集结果
let mut c = vec![vec![0.0; SIZE]; SIZE];
for handle in handles {
let (start, end, partial) = handle.join().unwrap();
c[start..end].copy_from_slice(&partial);
}
let duration = start.elapsed();
println!("Matrix multiplication took: {:?}", duration);
println!("Sample result: {}", c[0][0]);
}
// 生成随机矩阵
fn generate_matrix(size: usize) -> Matrix {
(0..size)
.map(|_| {
(0..size)
.map(|_| rand::random::<f64>())
.collect()
})
.collect()
}
性能建议
- 对于计算密集型任务,将线程数设置为物理核心数
- 对于I/O密集型任务,可以适当增加线程数
- 将相关任务绑定到同一个CPU核心上可以提高缓存命中率
- 避免频繁创建和销毁线程池
注意事项
- 在Linux系统上需要适当的权限来设置CPU亲和性
- 过度使用CPU亲和性可能会影响操作系统的调度优化
- 在某些虚拟化环境中可能无法设置CPU亲和性
affinitypool为需要精细控制线程调度和CPU亲和性的应用提供了强大的工具,特别适合高性能计算、游戏服务器、金融交易等低延迟场景。