golang轻量级事务性发件箱模式实现插件库outbox的使用
Golang轻量级事务性发件箱模式实现插件库outbox的使用
关键特性
- 轻量级: 仅添加一个外部依赖: google/uuid
- 数据库无关: 支持PostgreSQL、MySQL、Oracle等关系型数据库
- 消息代理无关: 适用于任何消息代理或外部系统
- 易于集成: 设计简单,易于集成到项目中
- 可观测性: 提供处理错误和丢弃消息的通道,可连接到监控和告警系统
- 快速发布: 可选的事务提交后立即异步发布消息,减少延迟,同时提供保证交付的备用方案
- 可配置重试和退避策略: 支持固定、指数或自定义退避策略
- 最大尝试次数保护: 自动丢弃超过配置的
maxAttempts
阈值的毒消息
使用方式
该库包含两个主要组件:
- Writer: 在事务中原子性地存储实体和对应的消息
- Reader: 在后台将存储的消息发布到消息代理
Writer
Writer确保实体和发件箱消息原子性地一起存储:
// 设置数据库连接
db, _ := sql.Open("pgx", "postgres://user:password@localhost:5432/outbox?sslmode=disable")
// 创建DBContext和Writer实例
dbCtx := outbox.NewDBContext(db, outbox.SQLDialectPostgres)
writer := outbox.NewWriter(dbCtx)
// 在业务逻辑中:
//
// 创建实体和发件箱消息
entity := Entity{
ID: uuid.New(),
CreatedAt: time.Now().UTC(),
}
payload, _ := json.Marshal(entity)
metadata := json.RawMessage(`{"trace_id":"abc123","correlation_id":"xyz789"}`)
msg := outbox.NewMessage(payload,
outbox.WithCreatedAt(entity.CreatedAt),
outbox.WithMetadata(metadata))
// 在单个事务中写入消息和实体
err = writer.Write(ctx, msg,
// 这个用户定义的回调在与存储发件箱消息相同的事务中执行查询
func(ctx context.Context, txQueryer outbox.TxQueryer) error {
_, err := txQueryer.ExecContext(ctx,
"INSERT INTO entity (id, created_at) VALUES ($1, $2)",
entity.ID, entity.CreatedAt,
)
return err
})
Reader
Reader定期检查未发送的消息并将它们发布到消息代理:
// 创建消息发布器实现
type messagePublisher struct {
// 你的消息代理客户端(如Kafka、RabbitMQ)
}
func (p *messagePublisher) Publish(ctx context.Context, msg *outbox.Message) error {
// 将消息发布到你的代理
return nil
}
// 创建并启动reader
reader := outbox.NewReader(
dbCtx, // 数据库上下文
&messagePublisher{}, // 发布器实现
outbox.WithInterval(5*time.Second), // 轮询间隔(默认: 10s)
outbox.WithReadBatchSize(200), // 读取批次大小(默认: 100)
outbox.WithDeleteBatchSize(50), // 删除批次大小(默认: 20)
outbox.WithMaxAttempts(300), // 300次尝试后丢弃(默认: MaxInt32)
outbox.WithExponentialDelay( // 尝试之间的延迟(默认: 指数;也可以使用固定或自定义)
500*time.Millisecond, // 初始延迟(默认: 200ms)
30*time.Minute), // 最大延迟(默认: 1h)
)
reader.Start()
defer reader.Stop(context.Background()) // 在应用关闭时停止
// 监控标准处理错误(发布/更新/删除/读取)
go func() {
for err := range reader.Errors() {
switch e := err.(type) {
case *outbox.PublishError:
log.Printf("Failed to publish message | ID: %s | Error: %v",
e.Message.ID, e.Err)
case *outbox.UpdateError:
log.Printf("Failed to update message | ID: %s | Error: %v",
e.Message.ID, e.Err)
case *outbox.DeleteError:
log.Printf("Batch message deletion failed | Count: %d | Error: %v",
len(e.Messages), e.Err)
for _, msg := range e.Messages {
log.Printf("Failed to delete message | ID: %s", msg.ID)
}
case *outbox.ReadError:
log.Printf("Failed to read outbox messages | Error: %v", e.Err)
default:
log.Printf("Unexpected error occurred | Error: %v", e)
}
}
}()
// 监控丢弃的消息(达到最大尝试次数阈值)
go func() {
for msg := range reader.DiscardedMessages() {
log.Printf("outbox message %s discarded after %d attempts",
msg.ID, msg.TimesAttempted)
// 示例后续步骤:
// • 转发到死信主题
// • 触发告警/指标
// • 持久化以供手动检查
}
}()
数据库设置
1. 选择数据库方言
库支持多种关系型数据库。创建DBContext
时配置适当的SQLDialect
。支持的方言有PostgreSQL、MySQL、MariaDB、SQLite、Oracle和SQL Server。
// 使用MySQL方言创建DBContext的示例
dbCtx := outbox.NewDBContext(db, outbox.SQLDialectMySQL)
2. 创建发件箱表
发件箱表存储需要发布到消息代理的消息。根据你的数据库选择:
🐘 PostgreSQL
CREATE TABLE IF NOT EXISTS outbox (
id UUID PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL,
metadata BYTEA,
payload BYTEA NOT NULL,
times_attempted INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_outbox_created_at ON outbox (created_at);
CREATE INDEX IF NOT EXISTS idx_outbox_scheduled_at ON outbox (scheduled_at);
📊 MySQL
CREATE TABLE IF NOT EXISTS outbox (
id BINARY(16) PRIMARY KEY,
created_at TIMESTAMP(3) NOT NULL,
scheduled_at TIMESTAMP(3) NOT NULL,
metadata BLOB,
payload BLOB NOT NULL,
times_attempted INT NOT NULL
);
CREATE INDEX idx_outbox_created_at ON outbox (created_at);
CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);
示例
完整的工作示例:
- Postgres & Kafka
- Oracle & NATS
- MySQL & RabbitMQ
运行示例:
cd examples/postgres-kafka # 或examples/oracle-nats或examples/mysql-rabitmq
../../scripts/up-and-wait.sh
go run service.go
# 在另一个终端触发POST请求创建实体
curl -X POST http://localhost:8080/entity
常见问题
当我的服务的多个实例使用该库时会发生什么?
当运行多个服务实例时,每个实例都有自己的reader,请注意:
- 多个reader会独立检索消息,可能导致消息被多次发布。处理方式:
- 确保消费者是幂等的并接受重复
- 如果可用,使用代理的重复数据删除功能
- 仅在单个实例中运行reader
乐观发布功能可以显著减少重复数量。
使用pgxpool时如何实例化DBContext?
可以使用stdlib.OpenDBFromPool
函数从*pgxpool.Pool
获取*sql.DB
。
import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/oagudo/outbox"
)
// ...
pool, _ := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
db := stdlib.OpenDBFromPool(pool)
dbCtx := outbox.NewDBContext(db, outbox.SQLDialectPostgres)
更多关于golang轻量级事务性发件箱模式实现插件库outbox的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang轻量级事务性发件箱模式实现插件库outbox的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang轻量级事务性发件箱模式实现
发件箱模式(Outbox Pattern)是一种解决分布式事务问题的常用模式,它通过在本地数据库事务中记录待发送的消息,然后由后台进程异步发送这些消息,确保本地操作和消息发送的最终一致性。
发件箱模式核心思想
- 将消息作为业务数据的一部分,在同一个事务中写入本地数据库
- 后台进程定期扫描未发送的消息并投递到消息队列
- 消息成功投递后标记为已发送
轻量级实现方案
以下是使用Golang实现的轻量级发件箱模式核心代码:
package outbox
import (
"context"
"database/sql"
"encoding/json"
"time"
)
// Message 发件箱消息结构
type Message struct {
ID int64 `json:"id"`
Topic string `json:"topic"`
Key string `json:"key"`
Payload json.RawMessage `json:"payload"`
CreatedAt time.Time `json:"created_at"`
SentAt *time.Time `json:"sent_at"`
}
// Store 发件箱存储接口
type Store interface {
AddMessage(ctx context.Context, tx *sql.Tx, topic, key string, payload interface{}) error
GetUnsentMessages(ctx context.Context, batchSize int) ([]Message, error)
MarkAsSent(ctx context.Context, ids []int64) error
}
// Publisher 消息发布接口
type Publisher interface {
Publish(ctx context.Context, topic, key string, payload []byte) error
}
// Outbox 发件箱主结构
type Outbox struct {
store Store
publisher Publisher
batchSize int
}
func NewOutbox(store Store, publisher Publisher, batchSize int) *Outbox {
return &Outbox{
store: store,
publisher: publisher,
batchSize: batchSize,
}
}
// AddMessage 在事务中添加消息
func (o *Outbox) AddMessage(ctx context.Context, tx *sql.Tx, topic, key string, payload interface{}) error {
return o.store.AddMessage(ctx, tx, topic, key, payload)
}
// ProcessOutbox 处理未发送的消息
func (o *Outbox) ProcessOutbox(ctx context.Context) error {
messages, err := o.store.GetUnsentMessages(ctx, o.batchSize)
if err != nil {
return err
}
var ids []int64
for _, msg := range messages {
if err := o.publisher.Publish(ctx, msg.Topic, msg.Key, msg.Payload); err != nil {
return err
}
ids = append(ids, msg.ID)
}
if len(ids) > 0 {
return o.store.MarkAsSent(ctx, ids)
}
return nil
}
使用示例
1. 初始化发件箱
// 实现Store和Publisher接口
type PostgresStore struct {
db *sql.DB
}
type KafkaPublisher struct {
producer sarama.SyncProducer
}
// 初始化
store := &PostgresStore{db: db}
publisher := &KafkaPublisher{producer: kafkaProducer}
outbox := outbox.NewOutbox(store, publisher, 100)
// 后台处理goroutine
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := outbox.ProcessOutbox(context.Background()); err != nil {
log.Printf("Process outbox error: %v", err)
}
case <-ctx.Done():
return
}
}
}()
2. 在业务事务中使用
func CreateOrder(ctx context.Context, order Order) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
}
}()
// 保存订单
if err := saveOrder(tx, order); err != nil {
return err
}
// 添加发件箱消息
event := OrderCreatedEvent{
OrderID: order.ID,
Amount: order.Amount,
}
if err := outbox.AddMessage(ctx, tx, "orders", order.ID, event); err != nil {
return err
}
return tx.Commit()
}
实现建议
- 存储实现:可以使用PostgreSQL、MySQL等关系型数据库实现Store接口
- 消息去重:为消息添加唯一ID或使用幂等性处理
- 错误处理:实现重试机制和死信队列处理无法发送的消息
- 性能优化:批量处理消息,减少数据库和消息队列的IO操作
- 监控:添加指标监控未处理消息数量和延迟
现有库推荐
如果需要更完整的解决方案,可以考虑以下开源库:
- watermill-outbox: https://github.com/ThreeDotsLabs/watermill-outbox
- go-outbox: https://github.com/alextanhongpin/go-outbox
- eventhorizon: https://github.com/looplab/eventhorizon
这些库提供了更完善的功能,如Exactly-Once投递、多种存储后端支持等。
发件箱模式是处理分布式事务的实用模式,通过将消息存储与业务数据放在同一事务中,确保了数据一致性,同时异步处理机制保证了系统性能。