Rust并发数据结构库left-right的使用,left-right提供高效无锁读写分离的并发访问方案
Rust并发数据结构库left-right的使用,left-right提供高效无锁读写分离的并发访问方案
介绍
Left-right是一个用于高并发读取的单写入者数据结构的并发原语。该原语保持两个备份数据结构副本,一个供读者访问,另一个供(单个)写入者访问。这使得所有读取操作都能以最小的协调并行进行,并将协调开销转移到写入者。在没有写入的情况下,读取操作随着核心数量的增加而线性扩展。
安装
在项目目录中运行以下Cargo命令:
cargo add left-right
或在Cargo.toml中添加以下行:
left-right = "0.11.5"
示例代码
以下是left-right库的基本使用示例:
use left_right::{Absorb, ReadHandle, WriteHandle};
use std::sync::Arc;
use std::thread;
// 定义我们的数据结构
#[derive(Default)]
struct MyData {
counter: u32,
}
// 实现Absorb trait以支持合并操作
impl Absorb<u32> for MyData {
fn absorb(&mut self, operation: &mut u32, _: &()) {
self.counter += *operation;
}
}
fn main() {
// 创建读写句柄
let (mut write_handle, read_handle) = left_right::new::<MyData, u32, ()>();
// 创建共享的读句柄
let shared_read = Arc::new(read_handle);
// 启动多个读线程
let mut reader_threads = vec![];
for i in 0..5 {
let reader = shared_read.clone();
reader_threads.push(thread::spawn(move || {
// 获取读锁
let guard = reader.enter().unwrap();
println!("Reader {} sees counter: {}", i, guard.counter);
}));
}
// 在写线程中修改数据
let writer_thread = thread::spawn(move || {
for i in 1..=10 {
// 写入操作
write_handle.append(i);
// 发布更改
write_handle.publish();
println!("Writer added {}", i);
}
});
// 等待所有读线程完成
for thread in reader_threads {
thread.join().unwrap();
}
// 等待写线程完成
writer_thread.join().unwrap();
// 最终读取
let final_read = shared_read.enter().unwrap();
println!("Final counter value: {}", final_read.counter);
}
完整示例代码
以下是left-right库的完整使用示例,包含更多注释和详细说明:
use left_right::{Absorb, ReadHandle, WriteHandle};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
// 定义我们的数据结构 - 一个简单的键值存储
#[derive(Default, Clone)]
struct KeyValueStore {
data: std::collections::HashMap<String, String>,
}
// 定义操作类型
enum Operation {
Insert(String, String),
Remove(String),
Clear,
}
// 实现Absorb trait以支持合并操作
impl Absorb<Operation> for KeyValueStore {
fn absorb(&mut self, operation: &mut Operation, _: &()) {
match operation {
Operation::Insert(k, v) => {
self.data.insert(k.clone(), v.clone());
}
Operation::Remove(k) => {
self.data.remove(k);
}
Operation::Clear => {
self.data.clear();
}
}
}
}
fn main() {
// 创建读写句柄
let (mut write_handle, read_handle) = left_right::new::<KeyValueStore, Operation, ()>();
// 创建共享的读句柄
let shared_read = Arc::new(read_handle);
// 启动多个读线程
let mut reader_threads = vec![];
for i in 0..3 {
let reader = shared_read.clone();
reader_threads.push(thread::spawn(move || {
loop {
// 获取读锁
let guard = reader.enter().unwrap();
let count = guard.data.len();
println!("Reader {} sees {} items", i, count);
// 打印所有键值对
for (k, v) in guard.data.iter() {
println!("Reader {} - {}: {}", i, k, v);
}
// 如果数据为空,退出循环
if count == 0 {
break;
}
// 暂停一会儿
thread::sleep(Duration::from_millis(500));
}
}));
}
// 在写线程中修改数据
let writer_thread = thread::spawn(move || {
// 插入一些数据
write_handle.append(Operation::Insert("key1".to_string(), "value1".to_string()));
write_handle.append(Operation::Insert("key2".to_string(), "value2".to_string()));
write_handle.publish();
println!("Writer inserted 2 items");
thread::sleep(Duration::from_secs(1));
// 更新一个值
write_handle.append(Operation::Insert("key1".to_string(), "new_value1".to_string()));
write_handle.publish();
println!("Writer updated key1");
thread::sleep(Duration::from_secs(1));
// 删除一个键
write_handle.append(Operation::Remove("key2".to_string()));
write_handle.publish();
println!("Writer removed key2");
thread::sleep(Duration::from_secs(1));
// 清空数据
write_handle.append(Operation::Clear);
write_handle.publish();
println!("Writer cleared all data");
});
// 等待写线程完成
writer_thread.join().unwrap();
// 等待所有读线程完成
for thread in reader_threads {
thread.join().unwrap();
}
// 最终读取
let final_read = shared_read.enter().unwrap();
println!("Final data count: {}", final_read.data.len());
}
代码说明
- 定义了一个键值存储数据结构
KeyValueStore
,使用HashMap
作为底层存储 - 定义了多种操作类型
Operation
,包括插入、删除和清空 - 实现了
Absorb
trait,定义了如何处理各种操作 - 创建了读写句柄,读句柄被包装在
Arc
中以共享给多个线程 - 启动了多个读线程定期检查数据状态
- 写线程执行一系列操作:插入、更新、删除和清空
- 读线程会持续监控数据变化直到数据被清空
- 最后打印最终数据状态
特点
- 无锁读取:读操作不需要锁,可以完全并行
- 单写入者:只有一个线程可以进行写入
- 线性扩展:在没有写入时,读取性能随核心数线性增长
- 高效协调:协调开销主要由写入者承担
- 支持多种操作类型:可以定义复杂的操作类型和合并逻辑
许可证
MIT OR Apache-2.0
1 回复
left-right:Rust高效无锁读写分离的并发数据结构库
简介
left-right是Rust中的一个并发数据结构库,它提供了一种高效的读写分离的无锁并发访问方案。该库的核心思想是通过维护两个数据副本(“left"和"right”)来实现无锁读取,同时允许写入操作在后台进行。
主要特点
- 无锁读取:读取操作不需要获取锁,因此不会阻塞其他读取操作
- 读写分离:读取和写入操作在不同的数据副本上进行
- 高效并发:特别适合读多写少的场景
- 线程安全:内置的同步机制保证线程安全
使用方法
基本使用
首先在Cargo.toml中添加依赖:
[dependencies]
left-right = "0.3"
创建left-right容器
use left_right::ReadHandleFactory;
let factory = ReadHandleFactory::new(0); // 初始值为0
let (mut write_handle, read_handle) = factory.into_handles();
写入操作
// 修改数据
write_handle.append(1);
write_handle.append(2);
// 提交修改,使读取端可见
write_handle.publish();
读取操作
// 获取读取句柄
let reader = read_handle.enter().unwrap();
// 读取数据
assert_eq!(*reader, 2); // 注意:此时只能看到已publish的数据
完整示例
use left_right::ReadHandleFactory;
use std::thread;
fn main() {
// 创建left-right容器
let factory = ReadHandleFactory::new(vec![]);
let (mut write_handle, read_handle) = factory.into_handles();
// 启动读取线程
let reader_thread = thread::spawn(move || {
for _ in 0..5 {
let reader = read_handle.enter().unwrap();
println!("Reader sees: {:?}", *reader);
thread::sleep(std::time::Duration::from_millis(100));
}
});
// 主线程进行写入
for i in 1..=3 {
write_handle.append(i);
write_handle.publish();
println!("Writer published {}", i);
thread::sleep(std::time::Duration::from_millis(200));
}
reader_thread.join().unwrap();
}
高级用法
自定义数据结构
use left_right::{ReadHandleFactory, Absorb};
#[derive(Clone)]
struct MyData {
value: i32,
}
impl Absorb<i32> for MyData {
fn absorb_first(&mut self, operation: &mut i32, _other: &Self) {
self.value += *operation;
}
fn absorb_second(&mut self, operation: i32, _other: &Self) {
self.value += operation;
}
fn drop_first(self: Box<Self>) {}
}
let factory = ReadHandleFactory::new(MyData { value: 0 });
let (mut write_handle, read_handle) = factory.into_handles();
write_handle.append(10);
write_handle.publish();
let reader = read_handle.enter().unwrap();
assert_eq!(reader.value, 10);
批量操作
write_handle.append(1);
write_handle.append(2);
write_handle.append(3);
write_handle.publish();
let reader = read_handle.enter().unwrap();
assert_eq!(*reader, 3); // 取决于你的Absorb实现
性能考虑
- 适合读多写少的场景
- 写入操作需要复制数据,因此写入频繁的场景可能性能不佳
- 读取操作完全无锁,性能极高
注意事项
- 写入操作只有在调用
publish()
后才会对读取端可见 - 需要为自定义类型实现
Absorb
trait - 读取端看到的数据可能不是最新的,取决于写入端是否已publish
left-right库为Rust提供了一种独特的并发访问模式,特别适合那些需要高吞吐量读取操作的场景。通过读写分离的设计,它避免了传统锁带来的争用问题,同时保证了线程安全。