Rust并行计算库rayon-core的使用,高性能数据并行处理与多线程任务调度库

Rust并行计算库rayon-core的使用,高性能数据并行处理与多线程任务调度库

注意:这是一个为rustc使用而创建的不稳定分支

Rayon-core代表了Rayon的"核心、稳定"API:join、scope等,以及使用ThreadPool创建自定义线程池的能力。

值得注意:用户不一定需要直接访问rayon-core;它的所有API都在rayon crate中有镜像。为此,文档中的示例使用rayon::join等而不是rayon_core::join。

rayon-core的目标是永远或几乎永远不会对其API进行破坏性更改,因为rayon-core的每个修订版也托管全局线程池(因此如果您有两个同时存在的rayon-core版本,您将有两个线程池)。

rayon-core目前需要rustc 1.63.0或更高版本。

安装

在项目目录中运行以下Cargo命令:

cargo add rustc-rayon-core

或者在Cargo.toml中添加以下行:

rustc-rayon-core = "0.5.1"

完整示例代码

以下是一个使用rayon-core进行并行计算的完整示例:

use rayon_core::ThreadPoolBuilder;

fn main() {
    // 创建一个自定义线程池
    let pool = ThreadPoolBuilder::new()
        .num_threads(4)  // 设置线程数为4
        .build()
        .unwrap();

    // 使用线程池并行处理数据
    pool.install(|| {
        let mut data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        
        // 并行迭代处理
        data.par_iter_mut().for_each(|x| {
            *x *= 2;  // 每个元素乘以2
        });

        println!("{:?}", data);  // 输出: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
    });

    // 使用rayon的join并行执行两个闭包
    let (sum1, sum2) = rayon::join(
        || (1..=1000).sum::<i32>(),
        || (1001..=2000).sum::<i32>(),
    );
    
    println!("Total sum: {}", sum1 + sum2);  // 输出: 2001000
}

主要功能

  1. 并行迭代器:提供par_iter()和par_iter_mut()等方法来并行处理集合
  2. join:并行执行两个闭包
  3. scope:创建可以生成并行任务的作用域
  4. 自定义线程池:通过ThreadPoolBuilder创建具有特定配置的线程池

许可证

MIT OR Apache-2.0

扩展示例代码

以下是一个更完整的rayon-core使用示例,展示了更多功能:

use rayon_core::{ThreadPoolBuilder, join, scope};

fn main() {
    // 示例1: 使用全局线程池的并行迭代
    let mut vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    vec.par_iter_mut().for_each(|x| {
        *x *= 3;  // 每个元素乘以3
    });
    println!("全局线程池处理结果: {:?}", vec);

    // 示例2: 使用join并行计算斐波那契数列
    fn fib(n: u32) -> u32 {
        if n <= 1 { n } else {
            let (a, b) = rayon::join(|| fib(n-1), || fib(n-2));
            a + b
        }
    }
    println!("fib(10) = {}", fib(10));

    // 示例3: 使用scope创建并行任务作用域
    let mut result = vec![];
    scope(|s| {
        for i in 0..10 {
            s.spawn(|_| {
                result.push(i * 2);  // 并行填充结果
            });
        }
    });
    println!("scope结果: {:?}", result);

    // 示例4: 自定义线程池配置
    let pool = ThreadPoolBuilder::new()
        .num_threads(2)  // 限制为2个线程
        .build()
        .unwrap();

    pool.install(|| {
        let sum: i32 = (0..1000).into_par_iter().sum();
        println!("自定义线程池计算结果: {}", sum);
    });
}

1 回复

Rust并行计算库rayon-core使用指南

完整示例demo

基本并行迭代器示例

use rayon::prelude::*;

fn main() {
    // 创建一个包含1-100的向量
    let numbers: Vec<i32> = (1..=100).collect();
    
    // 并行计算平方和
    let sum_of_squares = numbers.par_iter()
        .map(|&x| x * x)  // 并行计算平方
        .sum::<i32>();    // 并行求和
    
    println!("Sum of squares from 1 to 100: {}", sum_of_squares);
    
    // 并行过滤偶数并计算立方
    let even_cubes: Vec<i32> = numbers.par_iter()
        .filter(|&&x| x % 2 == 0)  // 并行过滤偶数
        .map(|&x| x * x * x)       // 并行计算立方
        .collect();                // 收集结果
    
    println!("Cubes of even numbers: {:?}", even_cubes);
}

自定义线程池和并行排序示例

use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use std::time::Instant;

fn main() {
    // 创建自定义线程池
    let pool = ThreadPoolBuilder::new()
        .num_threads(8)  // 使用8个线程
        .build()
        .unwrap();
    
    // 生成大量随机数
    let mut rng = rand::thread_rng();
    let mut big_data: Vec<u32> = (0..1_000_000)
        .map(|_| rng.gen_range(0..1_000_000))
        .collect();
    
    // 测量并行排序时间
    let start = Instant::now();
    pool.install(|| {
        big_data.par_sort();  // 在线程池中并行排序
    });
    let duration = start.elapsed();
    
    println!("Sorted 1,000,000 elements in: {:?}", duration);
    
    // 验证前10个元素是否已排序
    println!("First 10 elements: {:?}", &big_data[..10]);
}

复杂数据结构并行处理示例

use rayon::prelude::*;
use std::sync::Mutex;

#[derive(Debug, Clone)]
struct Employee {
    id: u32,
    name: String,
    salary: f64,
    department: String,
}

fn main() {
    // 创建员工数据集
    let employees = vec![
        Employee { id: 1, name: "Alice".into(), salary: 85000.0, department: "Engineering".into() },
        Employee { id: 2, name: "Bob".into(), salary: 75000.0, department: "Marketing".into() },
        Employee { id: 3, name: "Charlie".into(), salary: 92000.0, department: "Engineering".into() },
        Employee { id: 4, name: "Diana".into(), salary: 68000.0, department: "HR".into() },
        Employee { id: 5, name: "Eve".into(), salary: 110000.0, department: "Engineering".into() },
    ];
    
    // 并行计算各部门平均工资
    let department_stats = employees.par_iter()
        .fold(
            || std::collections::HashMap::<String, (f64, usize)>::new(),
            |mut acc, emp| {
                let entry = acc.entry(emp.department.clone()).or_insert((0.0, 0));
                entry.0 += emp.salary;
                entry.1 += 1;
                acc
            })
        .reduce(
            || std::collections::HashMap::new(),
            |mut a, b| {
                for (dept, (sum, count)) in b {
                    let entry = a.entry(dept).or_insert((0.0, 0));
                    entry.0 += sum;
                    entry.1 += count;
                }
                a
            });
    
    // 打印结果
    for (dept, (total, count)) in department_stats {
        println!("{}: average salary = {:.2}", dept, total / count as f64);
    }
    
    // 使用Mutex处理共享状态(谨慎使用)
    let total_salary = Mutex::new(0.0);
    employees.par_iter().for_each(|emp| {
        let mut sum = total_salary.lock().unwrap();
        *sum += emp.salary;
    });
    
    println!("Total company salary: {:.2}", total_salary.lock().unwrap());
}

并行图像处理示例

use rayon::prelude::*;
use image::{RgbImage, Rgb};

fn apply_grayscale_parallel(img: &mut RgbImage) {
    // 获取图像像素的并行迭代器
    img.par_iter_mut().for_each(|pixel| {
        // 计算灰度值 (使用标准灰度公式)
        let gray = ((pixel[0] as f32 * 0.299) + 
                   (pixel[1] as f32 * 0.587) + 
                   (pixel[2] as f32 * 0.114)) as u8;
        
        // 设置灰度值
        *pixel = Rgb([gray, gray, gray]);
    });
}

fn main() {
    // 加载彩色图像
    let mut img = image::open("input.jpg")
        .expect("Failed to open image")
        .to_rgb8();
    
    // 并行转换为灰度图
    apply_grayscale_parallel(&mut img);
    
    // 保存结果
    img.save("output.jpg")
        .expect("Failed to save image");
    
    println!("Image processing completed!");
}

最佳实践建议

  1. 数据大小评估:对于小于1000个元素的数据集,通常串行处理更快
  2. 任务粒度:确保每个并行任务至少有1微秒的工作量
  3. 避免锁竞争:尽量使用无锁设计或Rayon提供的并行归约操作
  4. 内存局部性:优化数据布局以提高缓存利用率
  5. 线程池配置:对于I/O密集型任务,可以增加线程数;对于纯计算任务,通常设置为CPU核心数

这些示例展示了rayon-core在不同场景下的应用,从简单的数据转换到复杂的并行算法实现。

回到顶部