golang分布式事务管理系统(2PC/3PC实现)插件库committer的使用

Golang分布式事务管理系统(2PC/3PC实现)插件库Committer的使用

Committer简介

Committer是一个用Go语言实现的**两阶段提交(2PC)三阶段提交(3PC)**协议库,用于分布式系统。

Committer Logo

架构设计

系统包含两种节点类型:协调者(Coordinator)跟随者(Followers)

  • 协调者负责发起和管理提交协议(2PC或3PC)
  • 跟随者(或称为cohorts)通过响应协调者的请求参与协议
  • 节点间通信使用gRPC
  • 每个节点的状态使用状态机管理

主要特性

  • 支持2PC和3PC:实现了两种广泛使用的分布式事务共识协议
  • 持久化:使用BadgerDB和WAL进行可靠的数据存储和事务日志记录
  • 可配置:所有选项都可以通过命令行参数指定
  • 验证钩子:可以在ProposeCommit阶段注入自定义逻辑
  • 基于gRPC的通信:使用gRPC进行高效的节点间通信

配置参数

所有配置参数都可以通过命令行参数设置:

参数 描述 默认值 示例
role 节点角色: coordinatorfollower follower -role=coordinator
nodeaddr 当前节点的地址 localhost:3050 -nodeaddr=localhost:3051
coordinator 协调者地址(跟随者必需) "" -coordinator=localhost:3050
committype 提交协议: two-phasethree-phase three-phase -committype=two-phase
timeout 未确认消息的超时时间(ms)(仅3PC) 1000 -timeout=500
dbpath BadgerDB数据库在文件系统中的路径 ./badger -dbpath=/tmp/badger
followers 跟随者地址的逗号分隔列表 "" -followers=localhost:3052,3053
whitelist 允许的主机列表 127.0.0.1 -whitelist=192.168.0.1,192.168.0.2

使用示例

作为跟随者运行

./committer -role=follower -nodeaddr=localhost:3001 -committype=three-phase -timeout=1000 -dbpath=/tmp/badger/follower

作为协调者运行

./committer -role=coordinator -nodeaddr=localhost:3000 -followers=localhost:3001 -committype=three-phase -timeout=1000 -dbpath=/tmp/badger/coordinator

钩子(Hooks)

钩子允许你在ProposeCommit阶段添加自定义验证逻辑。钩子是一个函数,接受*pb.ProposeRequest*pb.CommitRequest并返回一个布尔值。

示例钩子实现:

// 示例Propose钩子
func ProposeHook(req *pb.ProposeRequest) bool {
    // 自定义验证逻辑
    if req.Key == "invalid" {
        return false
    }
    return true
}

// 示例Commit钩子
func CommitHook(req *pb.CommitRequest) bool {
    // 自定义验证逻辑
    if req.Value == "invalid" {
        return false
    }
    return true
}

要注入你自己的逻辑,可以替换钩子文件或main.go中的代码,然后重新编译。

完整示例Demo

以下是一个使用Committer库的完整示例:

package main

import (
	"context"
	"flag"
	"log"
	"time"

	"github.com/vadiminshakov/committer/core/cohort"
	"github.com/vadiminshakov/committer/pb"
	"google.golang.org/grpc"
)

func main() {
	// 解析命令行参数
	role := flag.String("role", "follower", "node role (coordinator or follower)")
	nodeAddr := flag.String("nodeaddr", "localhost:3000", "node address")
	coordinatorAddr := flag.String("coordinator", "", "coordinator address (required for followers)")
	followers := flag.String("followers", "", "comma-separated list of follower addresses")
	commitType := flag.String("committype", "three-phase", "commit protocol (two-phase or three-phase)")
	dbPath := flag.String("dbpath", "./badger", "path to BadgerDB")
	flag.Parse()

	// 初始化节点
	if *role == "coordinator" {
		// 作为协调者启动
		coordinator := cohort.NewCoordinator(*nodeAddr, *followers, *commitType, *dbPath)
		if err := coordinator.Run(); err != nil {
			log.Fatal(err)
		}
	} else {
		// 作为跟随者启动
		follower := cohort.NewFollower(*nodeAddr, *coordinatorAddr, *commitType, *dbPath)
		if err := follower.Run(); err != nil {
			log.Fatal(err)
		}
	}

	// 示例客户端代码
	if *role == "client" {
		conn, err := grpc.Dial(*coordinatorAddr, grpc.WithInsecure())
		if err != nil {
			log.Fatal(err)
		}
		defer conn.Close()

		client := pb.NewCoordinatorClient(conn)

		// 发起一个事务
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()

		resp, err := client.Propose(ctx, &pb.ProposeRequest{
			Key:   "test-key",
			Value: "test-value",
		})
		if err != nil {
			log.Fatal(err)
		}

		if resp.Ack {
			log.Println("Propose phase succeeded")
		} else {
			log.Println("Propose phase failed")
		}

		// 提交事务
		commitResp, err := client.Commit(ctx, &pb.CommitRequest{
			Key: "test-key",
		})
		if err != nil {
			log.Fatal(err)
		}

		if commitResp.Ack {
			log.Println("Commit phase succeeded")
		} else {
			log.Println("Commit phase failed")
		}
	}
}

测试

运行功能测试

make tests

使用示例客户端测试

  1. 编译可执行文件:
make prepare
  1. 运行协调者:
make run-example-coordinator
  1. 在另一个终端运行跟随者:
make run-example-follower
  1. 启动示例客户端:
go run ./examples/client/client.go

贡献

欢迎贡献代码!如果你发现bug或有改进建议,可以提交PR或创建issue。

许可证

该项目使用Apache License许可证。


更多关于golang分布式事务管理系统(2PC/3PC实现)插件库committer的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang分布式事务管理系统(2PC/3PC实现)插件库committer的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang分布式事务管理系统(2PC/3PC实现)插件库Committer使用指南

概述

Committer是一个用于实现分布式事务管理的Golang插件库,支持2PC(两阶段提交)和3PC(三阶段提交)协议。它提供了简洁的API接口,帮助开发者快速构建可靠的分布式事务系统。

核心概念

  • 2PC (两阶段提交): 分为准备阶段和提交/回滚阶段
  • 3PC (三阶段提交): 在2PC基础上增加了预提交阶段,提高系统可用性
  • 事务协调者(Coordinator): 负责管理整个事务流程
  • 参与者(Participant): 执行具体事务操作的资源管理器

安装

go get github.com/your-repo/committer

基本使用

1. 初始化Committer

import "github.com/your-repo/committer"

func main() {
    // 初始化Committer配置
    config := committer.Config{
        Protocol:      "2PC", // 或 "3PC"
        Timeout:       30 * time.Second,
        RetryTimes:    3,
        StorageDriver: "etcd", // 支持etcd、redis等
    }
    
    // 创建Committer实例
    cmt, err := committer.NewCommitter(config)
    if err != nil {
        log.Fatalf("Failed to create committer: %v", err)
    }
}

2. 定义事务参与者

// 实现Participant接口
type MyParticipant struct {
    name string
}

func (p *MyParticipant) Prepare() error {
    // 准备阶段逻辑
    fmt.Printf("Participant %s preparing...\n", p.name)
    return nil // 返回nil表示准备成功
}

func (p *MyParticipant) Commit() error {
    // 提交阶段逻辑
    fmt.Printf("Participant %s committing...\n", p.name)
    return nil
}

func (p *MyParticipant) Rollback() error {
    // 回滚阶段逻辑
    fmt.Printf("Participant %s rolling back...\n", p.name)
    return nil
}

3. 执行分布式事务

func executeTransaction(cmt *committer.Committer) error {
    // 创建事务参与者
    p1 := &MyParticipant{name: "ServiceA"}
    p2 := &MyParticipant{name: "ServiceB"}
    
    // 注册参与者
    cmt.RegisterParticipant(p1)
    cmt.RegisterParticipant(p2)
    
    // 开始事务
    txID, err := cmt.Begin()
    if err != nil {
        return fmt.Errorf("begin transaction failed: %v", err)
    }
    
    // 执行事务
    err = cmt.Execute(txID)
    if err != nil {
        return fmt.Errorf("execute transaction failed: %v", err)
    }
    
    return nil
}

高级功能

1. 自定义存储驱动

// 实现Storage接口
type MyStorage struct{}

func (s *MyStorage) Save(txID string, state committer.TransactionState) error {
    // 自定义保存逻辑
    return nil
}

func (s *MyStorage) Load(txID string) (committer.TransactionState, error) {
    // 自定义加载逻辑
    return committer.StatePrepared, nil
}

// 使用自定义存储
config := committer.Config{
    StorageDriver: "custom",
    Storage:       &MyStorage{},
}

2. 超时和重试配置

config := committer.Config{
    Protocol:      "3PC",
    Timeout:       60 * time.Second, // 全局超时
    PhaseTimeout:  10 * time.Second, // 各阶段超时
    RetryTimes:    5,                // 重试次数
    RetryInterval: 2 * time.Second,  // 重试间隔
}

3. 事务恢复机制

// 查询未完成的事务
pendingTxIDs, err := cmt.GetPendingTransactions()
if err != nil {
    log.Printf("Failed to get pending transactions: %v", err)
}

// 恢复事务
for _, txID := range pendingTxIDs {
    state, err := cmt.GetTransactionState(txID)
    if err != nil {
        continue
    }
    
    // 根据状态决定恢复策略
    switch state {
    case committer.StatePrepared:
        // 尝试提交
        err = cmt.Commit(txID)
    case committer.StatePreCommit:
        // 3PC的预提交状态处理
        err = cmt.DoCommit(txID)
    default:
        // 其他状态回滚
        err = cmt.Rollback(txID)
    }
}

最佳实践

  1. 幂等性设计: 确保所有参与者操作都是幂等的,以便在重试时不会产生副作用
  2. 超时处理: 合理设置超时时间,避免长时间阻塞
  3. 日志记录: 详细记录事务各阶段状态,便于排查问题
  4. 监控告警: 监控事务成功率、耗时等指标
  5. 优雅停机: 处理系统关闭时的未完成事务

性能优化建议

  1. 使用高效的存储驱动(如etcd)
  2. 批量处理多个事务
  3. 优化网络通信(如使用gRPC替代HTTP)
  4. 异步执行不影响一致性的操作

Committer插件库为Golang开发者提供了构建可靠分布式事务系统的强大工具,通过合理配置和使用,可以确保数据在分布式环境中的一致性。

回到顶部