Rust并发哈希映射库evmap-derive的使用:高效读写分离与零冲突Map实现

Rust并发哈希映射库evmap-derive的使用:高效读写分离与零冲突Map实现

安装

在项目目录中运行以下Cargo命令:

cargo add evmap-derive

或者在Cargo.toml中添加:

evmap-derive = "0.2.0"

示例代码

以下是一个使用evmap-derive实现高效读写分离哈希映射的完整示例:

use evmap::{ReadHandle, WriteHandle};
use std::thread;

// 定义一个简单的键值结构
#[derive(Debug, Clone)]
struct Data {
    key: String,
    value: i32,
}

fn main() {
    // 创建读写句柄
    let (mut write_handle, read_handle) = evmap::new();
    
    // 写入线程
    let writer = thread::spawn(move || {
        for i in 0..10 {
            let data = Data {
                key: format!("key_{}", i),
                value: i,
            };
            // 插入数据
            write_handle.insert(data.key.clone(), data);
            // 刷新使数据对读者可见
            write_handle.refresh();
            thread::sleep(std::time::Duration::from_millis(100));
        }
    });

    // 读取线程
    let reader = thread::spawn(move || {
        for _ in 0..5 {
            // 获取读取快照
            let map = read_handle.read();
            // 遍历所有值
            for (key, values) in map.iter() {
                println!("Key: {}, Values: {:?}", key, values);
            }
            thread::sleep(std::time::Duration::from_millis(200));
        }
    });

    writer.join().unwrap();
    reader.join().unwrap();
}

特性说明

  1. 读写分离:evmap-derive采用读写分离设计,写入操作不会阻塞读取操作

  2. 零冲突:使用多版本并发控制(MVCC)实现无锁读取

  3. 高效:读取操作几乎没有任何性能开销

  4. 线程安全:内置线程安全机制,无需额外同步

使用场景

  • 高读取频率、低写入频率的并发场景
  • 需要保持数据一致性的多线程应用
  • 实时数据分析和监控系统

完整示例代码

以下是扩展后的完整示例,展示了更多evmap-derive的功能:

use evmap::{ReadHandle, WriteHandle};
use std::thread;
use std::collections::HashSet;

#[derive(Debug, Clone, PartialEq)]
struct SensorData {
    id: String,
    readings: Vec<f64>,
    timestamp: u64,
}

fn main() {
    // 创建读写句柄
    let (mut writer, reader) = evmap::new();
    
    // 模拟数据写入线程
    let writer_thread = thread::spawn(move || {
        // 初始数据写入
        let initial_data = vec![
            SensorData {
                id: "sensor_1".to_string(),
                readings: vec![23.5, 24.1, 22.9],
                timestamp: 1000,
            },
            SensorData {
                id: "sensor_2".to_string(),
                readings: vec![45.2, 46.0, 44.8],
                timestamp: 1000,
            },
        ];
        
        for data in initial_data {
            writer.insert(data.id.clone(), data);
        }
        writer.refresh();  // 使初始数据对读者可见
        
        // 模拟实时更新
        for i in 1..=5 {
            thread::sleep(std::time::Duration::from_millis(500));
            
            // 更新传感器1的数据
            let update = SensorData {
                id: "sensor_1".to_string(),
                readings: vec![20.0 + (i as f64 * 0.5)],
                timestamp: 1000 + i * 100,
            };
            
            // 使用update方法合并数据
            writer.update(update.id.clone(), |existing| {
                let mut new = existing[0].clone();
                new.readings.extend(update.readings.iter().cloned());
                new.timestamp = update.timestamp;
                new
            });
            
            writer.refresh();
        }
    });

    // 数据读取线程1 - 定期快照
    let reader1 = reader.clone();
    let thread1 = thread::spawn(move || {
        for _ in 0..3 {
            thread::sleep(std::time::Duration::from_secs(1));
            
            // 获取读取快照
            let snapshot = reader1.read();
            println!("[Reader1] 当前传感器数量: {}", snapshot.len());
            
            // 检查特定键是否存在
            if snapshot.contains_key("sensor_1") {
                println!("[Reader1] sensor_1 最新数据: {:?}", 
                    snapshot.get_one("sensor_1").unwrap());
            }
        }
    });

    // 数据读取线程2 - 聚合统计
    let thread2 = thread::spawn(move || {
        let mut seen_ids = HashSet::new();
        
        while seen_ids.len() < 2 {
            if let Some(sensors) = reader.read().into_alternate() {
                for id in sensors.keys() {
                    seen_ids.insert(id.clone());
                }
                
                // 计算所有传感器的平均值
                let total: f64 = sensors.values()
                    .flat_map(|v| v.iter().map(|d| d.readings.iter().sum::<f64>()))
                    .sum();
                let count: usize = sensors.values()
                    .map(|v| v.iter().map(|d| d.readings.len()).sum::<usize>())
                    .sum();
                
                if count > 0 {
                    println!("[Reader2] 平均读数: {:.2}", total / count as f64);
                }
            }
            thread::sleep(std::time::Duration::from_millis(300));
        }
    });

    writer_thread.join().unwrap();
    thread1.join().unwrap();
    thread2.join().unwrap();
}

这个完整示例展示了:

  1. 多线程并发读写
  2. 数据更新和合并操作
  3. 多种读取模式(快照、聚合统计)
  4. 键存在性检查
  5. 更复杂的数据结构处理

1 回复

Rust并发哈希映射库evmap-derive的使用:高效读写分离与零冲突Map实现

介绍

evmap-derive是基于evmap库的派生宏实现,提供了一个高效读写分离的并发哈希映射实现。它采用了"多读单写"的设计模式,允许无锁读取和线程安全写入,特别适合读多写少的场景。

主要特点:

  • 零冲突:读操作永远不会与写操作冲突
  • 高性能:读取完全无锁,写入只需最小化同步
  • 派生宏支持:简化使用方式
  • 原子更新:写操作是原子的

完整示例代码

下面是一个完整的示例,展示了如何使用evmap-derive实现一个简单的用户管理系统:

// 引入必要的库
use evmap::ReadHandle;
use evmap_derive::ShallowCopy;
use std::thread;
use std::time::Duration;

// 定义用户结构体并派生必要的trait
#[derive(ShallowCopy, Clone, Debug)]
struct User {
    id: u64,
    username: String,
    email: String,
}

fn main() {
    // 创建evmap实例,获取读写句柄
    let (mut writer, reader) = evmap::new();
    
    // 启动写入线程
    let writer_thread = thread::spawn(move || {
        // 模拟用户注册
        for i in 1..=100 {
            let user = User {
                id: i,
                username: format!("user_{}", i),
                email: format!("user_{}@example.com", i),
            };
            
            // 插入用户数据
            writer.insert(i, user);
            
            // 每插入10个用户刷新一次
            if i % 10 == 0 {
                writer.refresh();
                println!("[Writer] 已刷新第{}个用户", i);
            }
            
            thread::sleep(Duration::from_millis(50));
        }
        
        // 最后确保所有写入都刷新
        writer.refresh();
    });
    
    // 启动3个读取线程
    let reader_threads: Vec<_> = (0..3).map(|i| {
        let reader = reader.clone();
        thread::spawn(move || {
            for _ in 0..30 {
                // 随机查询一个用户
                let user_id = rand::random::<u64>() % 100 + 1;
                
                // 使用get_and方法获取用户数据
                if let Some(user) = reader.get_and(&user_id, |users| users[0].clone()) {
                    println!("[Reader {}] 查询到用户 {}: {:?}", i, user_id, user);
                } else {
                    println!("[Reader {}] 用户 {} 不存在", i, user_id);
                }
                
                thread::sleep(Duration::from_millis(100));
            }
        })
    }).collect();
    
    // 等待所有线程完成
    writer_thread.join().unwrap();
    for thread in reader_threads {
        thread.join().unwrap();
    }
    
    // 批量操作示例
    batch_operations();
}

// 批量操作演示函数
fn batch_operations() {
    let (mut writer, reader) = evmap::new();
    
    // 批量插入数据
    let data = vec![
        ("apple", 10),
        ("banana", 20),
        ("orange", 15),
        ("pear", 8),
    ];
    
    println!("\n批量操作演示:");
    
    writer.extend(data.into_iter());
    writer.refresh();
    
    // 批量查询
    if let Some(apple_price) = reader.get_and("apple", |v| v[0]) {
        println!("苹果的价格: {}", apple_price);
    }
    
    if let Some(banana_price) = reader.get_and("banana", |v| v[0]) {
        println!("香蕉的价格: {}", banana_price);
    }
    
    // 批量更新
    writer.update("apple", 12);  // 更新苹果价格
    writer.remove("pear");       // 删除梨子
    writer.refresh();
    
    println!("更新后的苹果价格: {}", reader.get_and("apple", |v| v[0]).unwrap());
    println!("梨子是否还存在: {}", reader.contains_key("pear"));
}

代码说明

  1. 数据结构定义

    • User结构体使用#[derive(ShallowCopy)]宏,这是evmap-derive要求的
  2. 读写分离

    • 通过evmap::new()创建读写句柄
    • 写入线程使用writer插入数据
    • 读取线程使用reader查询数据
  3. 数据刷新

    • 写入后必须调用refresh()使更改对读者可见
    • 示例中每插入10个用户刷新一次
  4. 并发安全

    • 多个读取线程可以同时无锁访问数据
    • 写入线程是唯一的,保证数据一致性
  5. 批量操作

    • 使用extend方法批量插入数据
    • 使用updateremove修改数据

注意事项

  1. 写入后必须调用refresh()才能使更改对读者可见
  2. 读取操作获取的是数据的快照,不会反映之后的写入
  3. 适合读多写少的场景,频繁写入可能影响性能
  4. 自定义类型需要实现ShallowCopy trait(通过派生宏)

evmap-derive提供了比标准库HashMap更好的并发读取性能,特别适合缓存、配置管理等读多写少的场景。

回到顶部