Rust分布式存储引擎raft-engine的使用,高效实现Raft共识算法的持久化与状态管理
Rust分布式存储引擎raft-engine的使用,高效实现Raft共识算法的持久化与状态管理
Raft Engine简介
Raft Engine是一个持久化的嵌入式存储引擎,采用类似bitcask的日志结构化设计。它是为TiKV设计的,用于存储Multi-Raft日志。
主要特性
- 提供API用于存储和检索具有连续索引的protobuf日志条目
- 为各个Raft Group提供键值存储
- 最小化写入放大
- 协作式垃圾回收
- 支持对日志条目进行lz4压缩
- 支持文件系统扩展
设计架构
Raft Engine由两个基本组件构成:memtable和日志文件。
在内存中,每个Raft Group都有自己的memtable,包含所有键值对和所有日志条目的文件位置。在存储上,用户写入被顺序写入活动日志文件,该文件会根据配置的阈值定期轮换。不同的Raft Group共享相同的日志流。
写入流程
与RocksDB类似,Raft Engine提供原子写入。用户可以在提交前将更改存入日志批次。
一个日志批次的写入可以分为三个步骤:
- 可选地压缩日志条目
- 写入日志文件
- 应用到memtable
在第2步,为了分组并发请求,每个写入线程必须进入队列。队列中的第一个自动成为队列领导者,负责将整个组写入日志文件。
支持同步和非同步写入。当批次中的一个写入被标记为同步时,批次领导者将在写入后调用fdatasync()
。这样,缓冲数据可以保证被刷新到存储上。
数据写入后,每个写入线程将继续独立地将更改应用到memtable。
垃圾回收
当更改应用到本地状态机后,相应的日志条目可以从Raft Engine中逻辑压缩。由于多个Raft Group共享相同的日志流,这些截断的日志会在日志文件中形成空洞。在垃圾回收期间,Raft Engine扫描这些空洞并压缩日志文件以释放存储空间。只有在这个时候,不需要的日志条目才会被物理删除。
Raft Engine以协作方式执行垃圾回收。
首先,它的时间由用户控制。只有当用户主动调用purge_expired_files()
例程时,Raft Engine才会合并和删除其日志文件。作为参考,TiKV默认每10秒调用一次。
其次,它向用户提供有用的反馈。每次调用GC例程时,Raft Engine都会检查自身并返回持有特别旧的日志条目的Raft Group列表。这些日志条目阻碍了GC进度,应该由用户进行压缩。
使用示例
在Cargo.toml中添加依赖:
[dependencies]
raft-engine = "0.4"
可用Cargo特性:
scripting
: 编译时包含Rhai。这启用了脚本调试工具,包括unsafe_repair
nightly
: 启用仅限nightly的特性,包括test
internals
: 重新导出Raft Engine内部的关键组件。为docs.rs构建时启用failpoints
: 启用由tikv/fail-rs支持的故障点测试swap
: 使用SwappyAllocator
限制Raft Engine的内存使用。内存预算可以用"memory-limit"配置。依赖于nightly
特性
完整示例代码
use raft_engine::{Config, Engine, LogBatch};
use prost::Message;
// 定义日志条目Protobuf消息
#[derive(Clone, Message)]
pub struct Entry {
#[prost(uint64, tag = "1")]
pub index: u64,
#[prost(bytes, tag = "2")]
pub data: Vec<u8>,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
// 配置Raft Engine
let cfg = Config {
dir: "./raft-engine".to_string(),
file_size: 128 * 1024 * 1024, // 128MB
purge_threshold: 16 * 1024 * 1024, // 16MB
..Default::default()
};
// 创建引擎实例
let engine = Engine::open(cfg)?;
// 模拟Raft Group ID
let raft_group_id = 1;
// 创建日志批次
let mut batch = LogBatch::default();
// 准备日志条目
let entries = vec![
Entry {
index: 1,
data: b"first log entry".to_vec(),
},
Entry {
index: 2,
data: b"second log entry".to_vec(),
},
];
// 将条目添加到批次
for entry in entries {
let mut buf = Vec::new();
entry.encode(&mut buf)?;
batch.add_entries(raft_group_id, &[entry.index], &[buf]);
}
// 写入引擎
engine.write(&mut batch, false)?;
// 读取日志条目
if let Some(data) = engine.get_entry(raft_group_id, 1)? {
let entry = Entry::decode(&data[..])?;
println!("Read entry at index 1: {:?}", entry);
}
// 模拟日志压缩
engine.compact_to(raft_group_id, 1)?;
// 执行垃圾回收
engine.purge_expired_files()?;
Ok(())
}
贡献指南
欢迎贡献!以下是提交PR的一些提示:
- 所有提交必须签名(使用
git commit -s
)以通过DCO检查 - 更改会自动运行测试,其中一些可以在本地运行:
# 使用nightly特性运行测试
make
# 在稳定工具链上运行测试
make WITH_STABLE_TOOLCHAIN=force
# 过滤特定测试用例
make test EXTRA_CARGO_ARGS=<testname>
许可证
版权所有 © 2017-present, PingCAP, Inc. 根据Apache 2.0许可证发布。
Rust分布式存储引擎raft-engine的使用
介绍
raft-engine是一个专为Raft共识算法设计的持久化存储引擎,它高效地实现了Raft日志的持久化和状态管理。这个库由PingCAP团队开发,主要用于TiKV项目,但也可以作为独立的存储引擎在其他Raft实现中使用。
raft-engine的主要特点包括:
- 高性能的日志存储和检索
- 自动压缩和垃圾回收
- 批量写入优化
- 支持快照
- 线程安全的设计
安装
在Cargo.toml中添加依赖:
[dependencies]
raft-engine = "0.4"
基本使用方法
1. 创建存储引擎
use raft_engine::{Config, Engine};
fn main() {
// 配置存储引擎
let cfg = Config {
dir: "./raft-engine-data".to_string(),
..Default::default()
};
// 创建引擎实例
let engine = Engine::open(cfg).unwrap();
}
2. 写入Raft日志
use raft_engine::LogBatch;
let mut lb = LogBatch::default();
let region_id = 1;
let index = 1;
let data = b"some raft log data";
// 添加日志条目
lb.add_entries(region_id, &[(index, data)]);
// 写入引擎
engine.write(&mut lb, false).unwrap();
3. 读取Raft日志
// 读取单个日志条目
if let Some(entry) = engine.get_entry(region_id, index).unwrap() {
println!("Got entry: {:?}", entry);
}
// 读取日志范围
let entries = engine.fetch_entries_to(
region_id,
index,
index + 10,
None,
u64::MAX
).unwrap();
println!("Fetched entries: {:?}", entries);
4. 处理快照
// 保存快照
let snapshot_index = 10;
engine.purge_expired_files().unwrap();
engine.clean(region_id, snapshot_index).unwrap();
// 应用快照后,可以安全地删除旧日志
engine.compact_to(region_id, snapshot_index).unwrap();
高级功能
批量操作
let mut lb = LogBatch::default();
for i in 0..100 {
let data = format!("log entry {}", i).into_bytes();
lb.add_entries(region_id, &[(i, &data)]);
}
// 批量写入
engine.write(&mut lb, true).unwrap();
配置调优
let cfg = Config {
dir: "./raft-engine-data".to_string(),
batch_size: 8 * 1024 * 1024, // 8MB
purge_threshold: 16 * 1024 * 1024, // 16MB
..Default::default()
};
性能建议
- 尽量使用批量写入而不是单条写入
- 根据工作负载调整
batch_size
和purge_threshold
- 定期调用
purge_expired_files
和compact_to
进行维护 - 对于高吞吐场景,考虑使用
write
方法的第二个参数设置为true
启用sync
完整示例代码
use raft_engine::{Config, Engine, LogBatch};
use raft::prelude::*;
use std::path::Path;
// 自定义Raft存储实现
struct RaftStore {
engine: Engine,
region_id: u64,
}
impl RaftStore {
// 创建新的存储实例
fn new<P: AsRef<Path>>(path: P, region_id: u64) -> Self {
let cfg = Config {
dir: path.as_ref().to_str().unwrap().to_string(),
batch_size: 8 * 1024 * 1024, // 8MB
purge_threshold: 16 * 1024 * 1024, // 16MB
..Default::default()
};
let engine = Engine::open(cfg).expect("Failed to open raft engine");
Self { engine, region_id }
}
// 追加日志条目
fn append(&self, entries: &[Entry]) -> Result<()> {
let mut batch = LogBatch::default();
for entry in entries {
batch.add_entries(
self.region_id,
&[(entry.index, &entry.data)],
);
}
self.engine.write(&mut batch, false)?;
Ok(())
}
// 获取日志条目范围
fn entries(&self, low: u64, high: u64) -> Result<Vec<Entry>> {
let entries = self.engine
.fetch_entries_to(self.region_id, low, high, None, u64::MAX)?
.into_iter()
.map(|(index, data)| Entry {
entry_type: EntryType::EntryNormal,
term: 1, // 示例中简化处理,实际应从日志中获取
index,
data,
context: vec![],
sync_log: false,
})
.collect();
Ok(entries)
}
// 创建快照
fn snapshot(&self, index: u64) -> Result<()> {
self.engine.purge_expired_files()?;
self.engine.clean(self.region_id, index)?;
Ok(())
}
// 压缩日志
fn compact(&self, index: u64) -> Result<()> {
self.engine.compact_to(self.region_id, index)?;
Ok(())
}
}
// 示例使用
fn main() -> Result<()> {
// 初始化存储
let store = RaftStore::new("./raft-data", 1);
// 写入示例日志
let entries = vec![
Entry {
entry_type: EntryType::EntryNormal,
term: 1,
index: 1,
data: b"first log".to_vec(),
context: vec![],
sync_log: false,
},
Entry {
entry_type: EntryType::EntryNormal,
term: 1,
index: 2,
data: b"second log".to_vec(),
context: vec![],
sync_log: false,
},
];
store.append(&entries)?;
// 读取日志
let fetched = store.entries(1, 3)?;
println!("Fetched entries: {:?}", fetched);
// 创建快照并压缩日志
store.snapshot(2)?;
store.compact(2)?;
Ok(())
}
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
raft-engine为Raft算法提供了高效的持久化层,通过合理使用可以显著提升分布式系统的性能和可靠性。