Rust并发数据结构库left-right的使用,left-right提供高效无锁读写分离的并发访问方案

Rust并发数据结构库left-right的使用,left-right提供高效无锁读写分离的并发访问方案

介绍

Left-right是一个用于高并发读取的单写入者数据结构的并发原语。该原语保持两个备份数据结构副本,一个供读者访问,另一个供(单个)写入者访问。这使得所有读取操作都能以最小的协调并行进行,并将协调开销转移到写入者。在没有写入的情况下,读取操作随着核心数量的增加而线性扩展。

安装

在项目目录中运行以下Cargo命令:

cargo add left-right

或在Cargo.toml中添加以下行:

left-right = "0.11.5"

示例代码

以下是left-right库的基本使用示例:

use left_right::{Absorb, ReadHandle, WriteHandle};
use std::sync::Arc;
use std::thread;

// 定义我们的数据结构
#[derive(Default)]
struct MyData {
    counter: u32,
}

// 实现Absorb trait以支持合并操作
impl Absorb<u32> for MyData {
    fn absorb(&mut self, operation: &mut u32, _: &()) {
        self.counter += *operation;
    }
}

fn main() {
    // 创建读写句柄
    let (mut write_handle, read_handle) = left_right::new::<MyData, u32, ()>();
    
    // 创建共享的读句柄
    let shared_read = Arc::new(read_handle);
    
    // 启动多个读线程
    let mut reader_threads = vec![];
    for i in 0..5 {
        let reader = shared_read.clone();
        reader_threads.push(thread::spawn(move || {
            // 获取读锁
            let guard = reader.enter().unwrap();
            println!("Reader {} sees counter: {}", i, guard.counter);
        }));
    }
    
    // 在写线程中修改数据
    let writer_thread = thread::spawn(move || {
        for i in 1..=10 {
            // 写入操作
            write_handle.append(i);
            // 发布更改
            write_handle.publish();
            println!("Writer added {}", i);
        }
    });
    
    // 等待所有读线程完成
    for thread in reader_threads {
        thread.join().unwrap();
    }
    
    // 等待写线程完成
    writer_thread.join().unwrap();
    
    // 最终读取
    let final_read = shared_read.enter().unwrap();
    println!("Final counter value: {}", final_read.counter);
}

完整示例代码

以下是left-right库的完整使用示例,包含更多注释和详细说明:

use left_right::{Absorb, ReadHandle, WriteHandle};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

// 定义我们的数据结构 - 一个简单的键值存储
#[derive(Default, Clone)]
struct KeyValueStore {
    data: std::collections::HashMap<String, String>,
}

// 定义操作类型
enum Operation {
    Insert(String, String),
    Remove(String),
    Clear,
}

// 实现Absorb trait以支持合并操作
impl Absorb<Operation> for KeyValueStore {
    fn absorb(&mut self, operation: &mut Operation, _: &()) {
        match operation {
            Operation::Insert(k, v) => {
                self.data.insert(k.clone(), v.clone());
            }
            Operation::Remove(k) => {
                self.data.remove(k);
            }
            Operation::Clear => {
                self.data.clear();
            }
        }
    }
}

fn main() {
    // 创建读写句柄
    let (mut write_handle, read_handle) = left_right::new::<KeyValueStore, Operation, ()>();
    
    // 创建共享的读句柄
    let shared_read = Arc::new(read_handle);
    
    // 启动多个读线程
    let mut reader_threads = vec![];
    for i in 0..3 {
        let reader = shared_read.clone();
        reader_threads.push(thread::spawn(move || {
            loop {
                // 获取读锁
                let guard = reader.enter().unwrap();
                let count = guard.data.len();
                println!("Reader {} sees {} items", i, count);
                
                // 打印所有键值对
                for (k, v) in guard.data.iter() {
                    println!("Reader {} - {}: {}", i, k, v);
                }
                
                // 如果数据为空,退出循环
                if count == 0 {
                    break;
                }
                
                // 暂停一会儿
                thread::sleep(Duration::from_millis(500));
            }
        }));
    }
    
    // 在写线程中修改数据
    let writer_thread = thread::spawn(move || {
        // 插入一些数据
        write_handle.append(Operation::Insert("key1".to_string(), "value1".to_string()));
        write_handle.append(Operation::Insert("key2".to_string(), "value2".to_string()));
        write_handle.publish();
        println!("Writer inserted 2 items");
        thread::sleep(Duration::from_secs(1));
        
        // 更新一个值
        write_handle.append(Operation::Insert("key1".to_string(), "new_value1".to_string()));
        write_handle.publish();
        println!("Writer updated key1");
        thread::sleep(Duration::from_secs(1));
        
        // 删除一个键
        write_handle.append(Operation::Remove("key2".to_string()));
        write_handle.publish();
        println!("Writer removed key2");
        thread::sleep(Duration::from_secs(1));
        
        // 清空数据
        write_handle.append(Operation::Clear);
        write_handle.publish();
        println!("Writer cleared all data");
    });
    
    // 等待写线程完成
    writer_thread.join().unwrap();
    
    // 等待所有读线程完成
    for thread in reader_threads {
        thread.join().unwrap();
    }
    
    // 最终读取
    let final_read = shared_read.enter().unwrap();
    println!("Final data count: {}", final_read.data.len());
}

代码说明

  1. 定义了一个键值存储数据结构KeyValueStore,使用HashMap作为底层存储
  2. 定义了多种操作类型Operation,包括插入、删除和清空
  3. 实现了Absorb trait,定义了如何处理各种操作
  4. 创建了读写句柄,读句柄被包装在Arc中以共享给多个线程
  5. 启动了多个读线程定期检查数据状态
  6. 写线程执行一系列操作:插入、更新、删除和清空
  7. 读线程会持续监控数据变化直到数据被清空
  8. 最后打印最终数据状态

特点

  • 无锁读取:读操作不需要锁,可以完全并行
  • 单写入者:只有一个线程可以进行写入
  • 线性扩展:在没有写入时,读取性能随核心数线性增长
  • 高效协调:协调开销主要由写入者承担
  • 支持多种操作类型:可以定义复杂的操作类型和合并逻辑

许可证

MIT OR Apache-2.0


1 回复

left-right:Rust高效无锁读写分离的并发数据结构库

简介

left-right是Rust中的一个并发数据结构库,它提供了一种高效的读写分离的无锁并发访问方案。该库的核心思想是通过维护两个数据副本(“left"和"right”)来实现无锁读取,同时允许写入操作在后台进行。

主要特点

  • 无锁读取:读取操作不需要获取锁,因此不会阻塞其他读取操作
  • 读写分离:读取和写入操作在不同的数据副本上进行
  • 高效并发:特别适合读多写少的场景
  • 线程安全:内置的同步机制保证线程安全

使用方法

基本使用

首先在Cargo.toml中添加依赖:

[dependencies]
left-right = "0.3"

创建left-right容器

use left_right::ReadHandleFactory;

let factory = ReadHandleFactory::new(0); // 初始值为0
let (mut write_handle, read_handle) = factory.into_handles();

写入操作

// 修改数据
write_handle.append(1);
write_handle.append(2);

// 提交修改,使读取端可见
write_handle.publish();

读取操作

// 获取读取句柄
let reader = read_handle.enter().unwrap();

// 读取数据
assert_eq!(*reader, 2); // 注意:此时只能看到已publish的数据

完整示例

use left_right::ReadHandleFactory;
use std::thread;

fn main() {
    // 创建left-right容器
    let factory = ReadHandleFactory::new(vec![]);
    let (mut write_handle, read_handle) = factory.into_handles();

    // 启动读取线程
    let reader_thread = thread::spawn(move || {
        for _ in 0..5 {
            let reader = read_handle.enter().unwrap();
            println!("Reader sees: {:?}", *reader);
            thread::sleep(std::time::Duration::from_millis(100));
        }
    });

    // 主线程进行写入
    for i in 1..=3 {
        write_handle.append(i);
        write_handle.publish();
        println!("Writer published {}", i);
        thread::sleep(std::time::Duration::from_millis(200));
    }

    reader_thread.join().unwrap();
}

高级用法

自定义数据结构

use left_right::{ReadHandleFactory, Absorb};

#[derive(Clone)]
struct MyData {
    value: i32,
}

impl Absorb<i32> for MyData {
    fn absorb_first(&mut self, operation: &mut i32, _other: &Self) {
        self.value += *operation;
    }
    
    fn absorb_second(&mut self, operation: i32, _other: &Self) {
        self.value += operation;
    }
    
    fn drop_first(self: Box<Self>) {}
}

let factory = ReadHandleFactory::new(MyData { value: 0 });
let (mut write_handle, read_handle) = factory.into_handles();

write_handle.append(10);
write_handle.publish();

let reader = read_handle.enter().unwrap();
assert_eq!(reader.value, 10);

批量操作

write_handle.append(1);
write_handle.append(2);
write_handle.append(3);
write_handle.publish();

let reader = read_handle.enter().unwrap();
assert_eq!(*reader, 3); // 取决于你的Absorb实现

性能考虑

  • 适合读多写少的场景
  • 写入操作需要复制数据,因此写入频繁的场景可能性能不佳
  • 读取操作完全无锁,性能极高

注意事项

  • 写入操作只有在调用publish()后才会对读取端可见
  • 需要为自定义类型实现Absorb trait
  • 读取端看到的数据可能不是最新的,取决于写入端是否已publish

left-right库为Rust提供了一种独特的并发访问模式,特别适合那些需要高吞吐量读取操作的场景。通过读写分离的设计,它避免了传统锁带来的争用问题,同时保证了线程安全。

回到顶部