Rust并行计算库rayon-core的使用,高性能数据并行处理与多线程任务调度库
Rust并行计算库rayon-core的使用,高性能数据并行处理与多线程任务调度库
注意:这是一个为rustc使用而创建的不稳定分支
Rayon-core代表了Rayon的"核心、稳定"API:join、scope等,以及使用ThreadPool创建自定义线程池的能力。
值得注意:用户不一定需要直接访问rayon-core;它的所有API都在rayon crate中有镜像。为此,文档中的示例使用rayon::join等而不是rayon_core::join。
rayon-core的目标是永远或几乎永远不会对其API进行破坏性更改,因为rayon-core的每个修订版也托管全局线程池(因此如果您有两个同时存在的rayon-core版本,您将有两个线程池)。
rayon-core目前需要rustc 1.63.0
或更高版本。
安装
在项目目录中运行以下Cargo命令:
cargo add rustc-rayon-core
或者在Cargo.toml中添加以下行:
rustc-rayon-core = "0.5.1"
完整示例代码
以下是一个使用rayon-core进行并行计算的完整示例:
use rayon_core::ThreadPoolBuilder;
fn main() {
// 创建一个自定义线程池
let pool = ThreadPoolBuilder::new()
.num_threads(4) // 设置线程数为4
.build()
.unwrap();
// 使用线程池并行处理数据
pool.install(|| {
let mut data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// 并行迭代处理
data.par_iter_mut().for_each(|x| {
*x *= 2; // 每个元素乘以2
});
println!("{:?}", data); // 输出: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
});
// 使用rayon的join并行执行两个闭包
let (sum1, sum2) = rayon::join(
|| (1..=1000).sum::<i32>(),
|| (1001..=2000).sum::<i32>(),
);
println!("Total sum: {}", sum1 + sum2); // 输出: 2001000
}
主要功能
- 并行迭代器:提供par_iter()和par_iter_mut()等方法来并行处理集合
- join:并行执行两个闭包
- scope:创建可以生成并行任务的作用域
- 自定义线程池:通过ThreadPoolBuilder创建具有特定配置的线程池
许可证
MIT OR Apache-2.0
扩展示例代码
以下是一个更完整的rayon-core使用示例,展示了更多功能:
use rayon_core::{ThreadPoolBuilder, join, scope};
fn main() {
// 示例1: 使用全局线程池的并行迭代
let mut vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
vec.par_iter_mut().for_each(|x| {
*x *= 3; // 每个元素乘以3
});
println!("全局线程池处理结果: {:?}", vec);
// 示例2: 使用join并行计算斐波那契数列
fn fib(n: u32) -> u32 {
if n <= 1 { n } else {
let (a, b) = rayon::join(|| fib(n-1), || fib(n-2));
a + b
}
}
println!("fib(10) = {}", fib(10));
// 示例3: 使用scope创建并行任务作用域
let mut result = vec![];
scope(|s| {
for i in 0..10 {
s.spawn(|_| {
result.push(i * 2); // 并行填充结果
});
}
});
println!("scope结果: {:?}", result);
// 示例4: 自定义线程池配置
let pool = ThreadPoolBuilder::new()
.num_threads(2) // 限制为2个线程
.build()
.unwrap();
pool.install(|| {
let sum: i32 = (0..1000).into_par_iter().sum();
println!("自定义线程池计算结果: {}", sum);
});
}
1 回复
Rust并行计算库rayon-core使用指南
完整示例demo
基本并行迭代器示例
use rayon::prelude::*;
fn main() {
// 创建一个包含1-100的向量
let numbers: Vec<i32> = (1..=100).collect();
// 并行计算平方和
let sum_of_squares = numbers.par_iter()
.map(|&x| x * x) // 并行计算平方
.sum::<i32>(); // 并行求和
println!("Sum of squares from 1 to 100: {}", sum_of_squares);
// 并行过滤偶数并计算立方
let even_cubes: Vec<i32> = numbers.par_iter()
.filter(|&&x| x % 2 == 0) // 并行过滤偶数
.map(|&x| x * x * x) // 并行计算立方
.collect(); // 收集结果
println!("Cubes of even numbers: {:?}", even_cubes);
}
自定义线程池和并行排序示例
use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use std::time::Instant;
fn main() {
// 创建自定义线程池
let pool = ThreadPoolBuilder::new()
.num_threads(8) // 使用8个线程
.build()
.unwrap();
// 生成大量随机数
let mut rng = rand::thread_rng();
let mut big_data: Vec<u32> = (0..1_000_000)
.map(|_| rng.gen_range(0..1_000_000))
.collect();
// 测量并行排序时间
let start = Instant::now();
pool.install(|| {
big_data.par_sort(); // 在线程池中并行排序
});
let duration = start.elapsed();
println!("Sorted 1,000,000 elements in: {:?}", duration);
// 验证前10个元素是否已排序
println!("First 10 elements: {:?}", &big_data[..10]);
}
复杂数据结构并行处理示例
use rayon::prelude::*;
use std::sync::Mutex;
#[derive(Debug, Clone)]
struct Employee {
id: u32,
name: String,
salary: f64,
department: String,
}
fn main() {
// 创建员工数据集
let employees = vec![
Employee { id: 1, name: "Alice".into(), salary: 85000.0, department: "Engineering".into() },
Employee { id: 2, name: "Bob".into(), salary: 75000.0, department: "Marketing".into() },
Employee { id: 3, name: "Charlie".into(), salary: 92000.0, department: "Engineering".into() },
Employee { id: 4, name: "Diana".into(), salary: 68000.0, department: "HR".into() },
Employee { id: 5, name: "Eve".into(), salary: 110000.0, department: "Engineering".into() },
];
// 并行计算各部门平均工资
let department_stats = employees.par_iter()
.fold(
|| std::collections::HashMap::<String, (f64, usize)>::new(),
|mut acc, emp| {
let entry = acc.entry(emp.department.clone()).or_insert((0.0, 0));
entry.0 += emp.salary;
entry.1 += 1;
acc
})
.reduce(
|| std::collections::HashMap::new(),
|mut a, b| {
for (dept, (sum, count)) in b {
let entry = a.entry(dept).or_insert((0.0, 0));
entry.0 += sum;
entry.1 += count;
}
a
});
// 打印结果
for (dept, (total, count)) in department_stats {
println!("{}: average salary = {:.2}", dept, total / count as f64);
}
// 使用Mutex处理共享状态(谨慎使用)
let total_salary = Mutex::new(0.0);
employees.par_iter().for_each(|emp| {
let mut sum = total_salary.lock().unwrap();
*sum += emp.salary;
});
println!("Total company salary: {:.2}", total_salary.lock().unwrap());
}
并行图像处理示例
use rayon::prelude::*;
use image::{RgbImage, Rgb};
fn apply_grayscale_parallel(img: &mut RgbImage) {
// 获取图像像素的并行迭代器
img.par_iter_mut().for_each(|pixel| {
// 计算灰度值 (使用标准灰度公式)
let gray = ((pixel[0] as f32 * 0.299) +
(pixel[1] as f32 * 0.587) +
(pixel[2] as f32 * 0.114)) as u8;
// 设置灰度值
*pixel = Rgb([gray, gray, gray]);
});
}
fn main() {
// 加载彩色图像
let mut img = image::open("input.jpg")
.expect("Failed to open image")
.to_rgb8();
// 并行转换为灰度图
apply_grayscale_parallel(&mut img);
// 保存结果
img.save("output.jpg")
.expect("Failed to save image");
println!("Image processing completed!");
}
最佳实践建议
- 数据大小评估:对于小于1000个元素的数据集,通常串行处理更快
- 任务粒度:确保每个并行任务至少有1微秒的工作量
- 避免锁竞争:尽量使用无锁设计或Rayon提供的并行归约操作
- 内存局部性:优化数据布局以提高缓存利用率
- 线程池配置:对于I/O密集型任务,可以增加线程数;对于纯计算任务,通常设置为CPU核心数
这些示例展示了rayon-core在不同场景下的应用,从简单的数据转换到复杂的并行算法实现。