golang分布式事务管理系统(2PC/3PC实现)插件库committer的使用
Golang分布式事务管理系统(2PC/3PC实现)插件库Committer的使用
Committer简介
Committer是一个用Go语言实现的**两阶段提交(2PC)和三阶段提交(3PC)**协议库,用于分布式系统。
架构设计
系统包含两种节点类型:协调者(Coordinator)和跟随者(Followers)。
- 协调者负责发起和管理提交协议(2PC或3PC)
- 跟随者(或称为cohorts)通过响应协调者的请求参与协议
- 节点间通信使用gRPC
- 每个节点的状态使用状态机管理
主要特性
- 支持2PC和3PC:实现了两种广泛使用的分布式事务共识协议
- 持久化:使用BadgerDB和WAL进行可靠的数据存储和事务日志记录
- 可配置:所有选项都可以通过命令行参数指定
- 验证钩子:可以在Propose和Commit阶段注入自定义逻辑
- 基于gRPC的通信:使用gRPC进行高效的节点间通信
配置参数
所有配置参数都可以通过命令行参数设置:
参数 | 描述 | 默认值 | 示例 |
---|---|---|---|
role |
节点角色: coordinator 或 follower |
follower |
-role=coordinator |
nodeaddr |
当前节点的地址 | localhost:3050 |
-nodeaddr=localhost:3051 |
coordinator |
协调者地址(跟随者必需) | "" |
-coordinator=localhost:3050 |
committype |
提交协议: two-phase 或 three-phase |
three-phase |
-committype=two-phase |
timeout |
未确认消息的超时时间(ms)(仅3PC) | 1000 |
-timeout=500 |
dbpath |
BadgerDB数据库在文件系统中的路径 | ./badger |
-dbpath=/tmp/badger |
followers |
跟随者地址的逗号分隔列表 | "" |
-followers=localhost:3052,3053 |
whitelist |
允许的主机列表 | 127.0.0.1 |
-whitelist=192.168.0.1,192.168.0.2 |
使用示例
作为跟随者运行
./committer -role=follower -nodeaddr=localhost:3001 -committype=three-phase -timeout=1000 -dbpath=/tmp/badger/follower
作为协调者运行
./committer -role=coordinator -nodeaddr=localhost:3000 -followers=localhost:3001 -committype=three-phase -timeout=1000 -dbpath=/tmp/badger/coordinator
钩子(Hooks)
钩子允许你在Propose和Commit阶段添加自定义验证逻辑。钩子是一个函数,接受*pb.ProposeRequest
或*pb.CommitRequest
并返回一个布尔值。
示例钩子实现:
// 示例Propose钩子
func ProposeHook(req *pb.ProposeRequest) bool {
// 自定义验证逻辑
if req.Key == "invalid" {
return false
}
return true
}
// 示例Commit钩子
func CommitHook(req *pb.CommitRequest) bool {
// 自定义验证逻辑
if req.Value == "invalid" {
return false
}
return true
}
要注入你自己的逻辑,可以替换钩子文件或main.go中的代码,然后重新编译。
完整示例Demo
以下是一个使用Committer库的完整示例:
package main
import (
"context"
"flag"
"log"
"time"
"github.com/vadiminshakov/committer/core/cohort"
"github.com/vadiminshakov/committer/pb"
"google.golang.org/grpc"
)
func main() {
// 解析命令行参数
role := flag.String("role", "follower", "node role (coordinator or follower)")
nodeAddr := flag.String("nodeaddr", "localhost:3000", "node address")
coordinatorAddr := flag.String("coordinator", "", "coordinator address (required for followers)")
followers := flag.String("followers", "", "comma-separated list of follower addresses")
commitType := flag.String("committype", "three-phase", "commit protocol (two-phase or three-phase)")
dbPath := flag.String("dbpath", "./badger", "path to BadgerDB")
flag.Parse()
// 初始化节点
if *role == "coordinator" {
// 作为协调者启动
coordinator := cohort.NewCoordinator(*nodeAddr, *followers, *commitType, *dbPath)
if err := coordinator.Run(); err != nil {
log.Fatal(err)
}
} else {
// 作为跟随者启动
follower := cohort.NewFollower(*nodeAddr, *coordinatorAddr, *commitType, *dbPath)
if err := follower.Run(); err != nil {
log.Fatal(err)
}
}
// 示例客户端代码
if *role == "client" {
conn, err := grpc.Dial(*coordinatorAddr, grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pb.NewCoordinatorClient(conn)
// 发起一个事务
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.Propose(ctx, &pb.ProposeRequest{
Key: "test-key",
Value: "test-value",
})
if err != nil {
log.Fatal(err)
}
if resp.Ack {
log.Println("Propose phase succeeded")
} else {
log.Println("Propose phase failed")
}
// 提交事务
commitResp, err := client.Commit(ctx, &pb.CommitRequest{
Key: "test-key",
})
if err != nil {
log.Fatal(err)
}
if commitResp.Ack {
log.Println("Commit phase succeeded")
} else {
log.Println("Commit phase failed")
}
}
}
测试
运行功能测试
make tests
使用示例客户端测试
- 编译可执行文件:
make prepare
- 运行协调者:
make run-example-coordinator
- 在另一个终端运行跟随者:
make run-example-follower
- 启动示例客户端:
go run ./examples/client/client.go
贡献
欢迎贡献代码!如果你发现bug或有改进建议,可以提交PR或创建issue。
许可证
该项目使用Apache License许可证。
更多关于golang分布式事务管理系统(2PC/3PC实现)插件库committer的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复