Rust分布式存储引擎raft-engine的使用,高效实现Raft共识算法的持久化与状态管理

Rust分布式存储引擎raft-engine的使用,高效实现Raft共识算法的持久化与状态管理

Raft Engine简介

Raft Engine是一个持久化的嵌入式存储引擎,采用类似bitcask的日志结构化设计。它是为TiKV设计的,用于存储Multi-Raft日志。

主要特性

  • 提供API用于存储和检索具有连续索引的protobuf日志条目
  • 为各个Raft Group提供键值存储
  • 最小化写入放大
  • 协作式垃圾回收
  • 支持对日志条目进行lz4压缩
  • 支持文件系统扩展

设计架构

Raft Engine由两个基本组件构成:memtable和日志文件。

在内存中,每个Raft Group都有自己的memtable,包含所有键值对和所有日志条目的文件位置。在存储上,用户写入被顺序写入活动日志文件,该文件会根据配置的阈值定期轮换。不同的Raft Group共享相同的日志流。

写入流程

与RocksDB类似,Raft Engine提供原子写入。用户可以在提交前将更改存入日志批次。

一个日志批次的写入可以分为三个步骤:

  1. 可选地压缩日志条目
  2. 写入日志文件
  3. 应用到memtable

在第2步,为了分组并发请求,每个写入线程必须进入队列。队列中的第一个自动成为队列领导者,负责将整个组写入日志文件。

支持同步和非同步写入。当批次中的一个写入被标记为同步时,批次领导者将在写入后调用fdatasync()。这样,缓冲数据可以保证被刷新到存储上。

数据写入后,每个写入线程将继续独立地将更改应用到memtable。

垃圾回收

当更改应用到本地状态机后,相应的日志条目可以从Raft Engine中逻辑压缩。由于多个Raft Group共享相同的日志流,这些截断的日志会在日志文件中形成空洞。在垃圾回收期间,Raft Engine扫描这些空洞并压缩日志文件以释放存储空间。只有在这个时候,不需要的日志条目才会被物理删除。

Raft Engine以协作方式执行垃圾回收。

首先,它的时间由用户控制。只有当用户主动调用purge_expired_files()例程时,Raft Engine才会合并和删除其日志文件。作为参考,TiKV默认每10秒调用一次。

其次,它向用户提供有用的反馈。每次调用GC例程时,Raft Engine都会检查自身并返回持有特别旧的日志条目的Raft Group列表。这些日志条目阻碍了GC进度,应该由用户进行压缩。

使用示例

在Cargo.toml中添加依赖:

[dependencies]
raft-engine = "0.4"

可用Cargo特性:

  • scripting: 编译时包含Rhai。这启用了脚本调试工具,包括unsafe_repair
  • nightly: 启用仅限nightly的特性,包括test
  • internals: 重新导出Raft Engine内部的关键组件。为docs.rs构建时启用
  • failpoints: 启用由tikv/fail-rs支持的故障点测试
  • swap: 使用SwappyAllocator限制Raft Engine的内存使用。内存预算可以用"memory-limit"配置。依赖于nightly特性

完整示例代码

use raft_engine::{Config, Engine, LogBatch};
use prost::Message;

// 定义日志条目Protobuf消息
#[derive(Clone, Message)]
pub struct Entry {
    #[prost(uint64, tag = "1")]
    pub index: u64,
    #[prost(bytes, tag = "2")]
    pub data: Vec<u8>,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 配置Raft Engine
    let cfg = Config {
        dir: "./raft-engine".to_string(),
        file_size: 128 * 1024 * 1024, // 128MB
        purge_threshold: 16 * 1024 * 1024, // 16MB
        ..Default::default()
    };
    
    // 创建引擎实例
    let engine = Engine::open(cfg)?;
    
    // 模拟Raft Group ID
    let raft_group_id = 1;
    
    // 创建日志批次
    let mut batch = LogBatch::default();
    
    // 准备日志条目
    let entries = vec![
        Entry {
            index: 1,
            data: b"first log entry".to_vec(),
        },
        Entry {
            index: 2,
            data: b"second log entry".to_vec(),
        },
    ];
    
    // 将条目添加到批次
    for entry in entries {
        let mut buf = Vec::new();
        entry.encode(&mut buf)?;
        batch.add_entries(raft_group_id, &[entry.index], &[buf]);
    }
    
    // 写入引擎
    engine.write(&mut batch, false)?;
    
    // 读取日志条目
    if let Some(data) = engine.get_entry(raft_group_id, 1)? {
        let entry = Entry::decode(&data[..])?;
        println!("Read entry at index 1: {:?}", entry);
    }
    
    // 模拟日志压缩
    engine.compact_to(raft_group_id, 1)?;
    
    // 执行垃圾回收
    engine.purge_expired_files()?;
    
    Ok(())
}

贡献指南

欢迎贡献!以下是提交PR的一些提示:

  • 所有提交必须签名(使用git commit -s)以通过DCO检查
  • 更改会自动运行测试,其中一些可以在本地运行:
# 使用nightly特性运行测试
make
# 在稳定工具链上运行测试
make WITH_STABLE_TOOLCHAIN=force
# 过滤特定测试用例
make test EXTRA_CARGO_ARGS=<testname>

许可证

版权所有 © 2017-present, PingCAP, Inc. 根据Apache 2.0许可证发布。


1 回复

Rust分布式存储引擎raft-engine的使用

介绍

raft-engine是一个专为Raft共识算法设计的持久化存储引擎,它高效地实现了Raft日志的持久化和状态管理。这个库由PingCAP团队开发,主要用于TiKV项目,但也可以作为独立的存储引擎在其他Raft实现中使用。

raft-engine的主要特点包括:

  • 高性能的日志存储和检索
  • 自动压缩和垃圾回收
  • 批量写入优化
  • 支持快照
  • 线程安全的设计

安装

在Cargo.toml中添加依赖:

[dependencies]
raft-engine = "0.4"

基本使用方法

1. 创建存储引擎

use raft_engine::{Config, Engine};

fn main() {
    // 配置存储引擎
    let cfg = Config {
        dir: "./raft-engine-data".to_string(),
        ..Default::default()
    };
    
    // 创建引擎实例
    let engine = Engine::open(cfg).unwrap();
}

2. 写入Raft日志

use raft_engine::LogBatch;

let mut lb = LogBatch::default();
let region_id = 1;
let index = 1;
let data = b"some raft log data";

// 添加日志条目
lb.add_entries(region_id, &[(index, data)]);

// 写入引擎
engine.write(&mut lb, false).unwrap();

3. 读取Raft日志

// 读取单个日志条目
if let Some(entry) = engine.get_entry(region_id, index).unwrap() {
    println!("Got entry: {:?}", entry);
}

// 读取日志范围
let entries = engine.fetch_entries_to(
    region_id, 
    index, 
    index + 10, 
    None, 
    u64::MAX
).unwrap();
println!("Fetched entries: {:?}", entries);

4. 处理快照

// 保存快照
let snapshot_index = 10;
engine.purge_expired_files().unwrap();
engine.clean(region_id, snapshot_index).unwrap();

// 应用快照后,可以安全地删除旧日志
engine.compact_to(region_id, snapshot_index).unwrap();

高级功能

批量操作

let mut lb = LogBatch::default();
for i in 0..100 {
    let data = format!("log entry {}", i).into_bytes();
    lb.add_entries(region_id, &[(i, &data)]);
}

// 批量写入
engine.write(&mut lb, true).unwrap();

配置调优

let cfg = Config {
    dir: "./raft-engine-data".to_string(),
    batch_size: 8 * 1024 * 1024, // 8MB
    purge_threshold: 16 * 1024 * 1024, // 16MB
    ..Default::default()
};

性能建议

  1. 尽量使用批量写入而不是单条写入
  2. 根据工作负载调整batch_sizepurge_threshold
  3. 定期调用purge_expired_filescompact_to进行维护
  4. 对于高吞吐场景,考虑使用write方法的第二个参数设置为true启用sync

完整示例代码

use raft_engine::{Config, Engine, LogBatch};
use raft::prelude::*;
use std::path::Path;

// 自定义Raft存储实现
struct RaftStore {
    engine: Engine,
    region_id: u64,
}

impl RaftStore {
    // 创建新的存储实例
    fn new<P: AsRef<Path>>(path: P, region_id: u64) -> Self {
        let cfg = Config {
            dir: path.as_ref().to_str().unwrap().to_string(),
            batch_size: 8 * 1024 * 1024,  // 8MB
            purge_threshold: 16 * 1024 * 1024,  // 16MB
            ..Default::default()
        };
        
        let engine = Engine::open(cfg).expect("Failed to open raft engine");
        Self { engine, region_id }
    }

    // 追加日志条目
    fn append(&self, entries: &[Entry]) -> Result<()> {
        let mut batch = LogBatch::default();
        for entry in entries {
            batch.add_entries(
                self.region_id,
                &[(entry.index, &entry.data)],
            );
        }
        self.engine.write(&mut batch, false)?;
        Ok(())
    }

    // 获取日志条目范围
    fn entries(&self, low: u64, high: u64) -> Result<Vec<Entry>> {
        let entries = self.engine
            .fetch_entries_to(self.region_id, low, high, None, u64::MAX)?
            .into_iter()
            .map(|(index, data)| Entry {
                entry_type: EntryType::EntryNormal,
                term: 1,  // 示例中简化处理,实际应从日志中获取
                index,
                data,
                context: vec![],
                sync_log: false,
            })
            .collect();
        
        Ok(entries)
    }

    // 创建快照
    fn snapshot(&self, index: u64) -> Result<()> {
        self.engine.purge_expired_files()?;
        self.engine.clean(self.region_id, index)?;
        Ok(())
    }

    // 压缩日志
    fn compact(&self, index: u64) -> Result<()> {
        self.engine.compact_to(self.region_id, index)?;
        Ok(())
    }
}

// 示例使用
fn main() -> Result<()> {
    // 初始化存储
    let store = RaftStore::new("./raft-data", 1);
    
    // 写入示例日志
    let entries = vec![
        Entry {
            entry_type: EntryType::EntryNormal,
            term: 1,
            index: 1,
            data: b"first log".to_vec(),
            context: vec![],
            sync_log: false,
        },
        Entry {
            entry_type: EntryType::EntryNormal,
            term: 1,
            index: 2,
            data: b"second log".to_vec(),
            context: vec![],
            sync_log: false,
        },
    ];
    
    store.append(&entries)?;
    
    // 读取日志
    let fetched = store.entries(1, 3)?;
    println!("Fetched entries: {:?}", fetched);
    
    // 创建快照并压缩日志
    store.snapshot(2)?;
    store.compact(2)?;
    
    Ok(())
}

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

raft-engine为Raft算法提供了高效的持久化层,通过合理使用可以显著提升分布式系统的性能和可靠性。

回到顶部