golang轻量级事务性发件箱模式实现插件库outbox的使用

Golang轻量级事务性发件箱模式实现插件库outbox的使用

outbox logo

关键特性

  • 轻量级: 仅添加一个外部依赖: google/uuid
  • 数据库无关: 支持PostgreSQL、MySQL、Oracle等关系型数据库
  • 消息代理无关: 适用于任何消息代理或外部系统
  • 易于集成: 设计简单,易于集成到项目中
  • 可观测性: 提供处理错误和丢弃消息的通道,可连接到监控和告警系统
  • 快速发布: 可选的事务提交后立即异步发布消息,减少延迟,同时提供保证交付的备用方案
  • 可配置重试和退避策略: 支持固定、指数或自定义退避策略
  • 最大尝试次数保护: 自动丢弃超过配置的maxAttempts阈值的毒消息

使用方式

该库包含两个主要组件:

  1. Writer: 在事务中原子性地存储实体和对应的消息
  2. Reader: 在后台将存储的消息发布到消息代理

Writer

Writer确保实体和发件箱消息原子性地一起存储:

// 设置数据库连接
db, _ := sql.Open("pgx", "postgres://user:password@localhost:5432/outbox?sslmode=disable")

// 创建DBContext和Writer实例
dbCtx := outbox.NewDBContext(db, outbox.SQLDialectPostgres)
writer := outbox.NewWriter(dbCtx)

// 在业务逻辑中:
//
// 创建实体和发件箱消息
entity := Entity{
    ID:        uuid.New(),
    CreatedAt: time.Now().UTC(),
}

payload, _ := json.Marshal(entity)
metadata := json.RawMessage(`{"trace_id":"abc123","correlation_id":"xyz789"}`)
msg := outbox.NewMessage(payload,
    outbox.WithCreatedAt(entity.CreatedAt),
    outbox.WithMetadata(metadata))

// 在单个事务中写入消息和实体
err = writer.Write(ctx, msg,
    // 这个用户定义的回调在与存储发件箱消息相同的事务中执行查询
    func(ctx context.Context, txQueryer outbox.TxQueryer) error {
        _, err := txQueryer.ExecContext(ctx,
            "INSERT INTO entity (id, created_at) VALUES ($1, $2)",
            entity.ID, entity.CreatedAt,
        )
        return err
    })

Reader

Reader定期检查未发送的消息并将它们发布到消息代理:

// 创建消息发布器实现
type messagePublisher struct {
    // 你的消息代理客户端(如Kafka、RabbitMQ)
}
func (p *messagePublisher) Publish(ctx context.Context, msg *outbox.Message) error {
    // 将消息发布到你的代理
    return nil
}

// 创建并启动reader
reader := outbox.NewReader(
    dbCtx,                              // 数据库上下文
    &messagePublisher{},                // 发布器实现
    outbox.WithInterval(5*time.Second), // 轮询间隔(默认: 10s)
    outbox.WithReadBatchSize(200),      // 读取批次大小(默认: 100)
    outbox.WithDeleteBatchSize(50),     // 删除批次大小(默认: 20)
    outbox.WithMaxAttempts(300),        // 300次尝试后丢弃(默认: MaxInt32)
    outbox.WithExponentialDelay(        // 尝试之间的延迟(默认: 指数;也可以使用固定或自定义)
        500*time.Millisecond,           // 初始延迟(默认: 200ms)
        30*time.Minute),                // 最大延迟(默认: 1h)
)
reader.Start()
defer reader.Stop(context.Background()) // 在应用关闭时停止

// 监控标准处理错误(发布/更新/删除/读取)
go func() {
    for err := range reader.Errors() {
        switch e := err.(type) {
        case *outbox.PublishError:
            log.Printf("Failed to publish message | ID: %s | Error: %v",
                e.Message.ID, e.Err)

        case *outbox.UpdateError:
            log.Printf("Failed to update message | ID: %s | Error: %v",
                e.Message.ID, e.Err)

        case *outbox.DeleteError:
            log.Printf("Batch message deletion failed | Count: %d | Error: %v",
                len(e.Messages), e.Err)
            for _, msg := range e.Messages {
                log.Printf("Failed to delete message | ID: %s", msg.ID)
            }

        case *outbox.ReadError:
            log.Printf("Failed to read outbox messages | Error: %v", e.Err)

        default:
            log.Printf("Unexpected error occurred | Error: %v", e)
        }
    }
}()

// 监控丢弃的消息(达到最大尝试次数阈值)
go func() {
    for msg := range reader.DiscardedMessages() {
        log.Printf("outbox message %s discarded after %d attempts",
            msg.ID, msg.TimesAttempted)
        // 示例后续步骤:
        //   • 转发到死信主题
        //   • 触发告警/指标
        //   • 持久化以供手动检查
    }
}()

数据库设置

1. 选择数据库方言

库支持多种关系型数据库。创建DBContext时配置适当的SQLDialect。支持的方言有PostgreSQL、MySQL、MariaDB、SQLite、Oracle和SQL Server。

// 使用MySQL方言创建DBContext的示例
dbCtx := outbox.NewDBContext(db, outbox.SQLDialectMySQL)

2. 创建发件箱表

发件箱表存储需要发布到消息代理的消息。根据你的数据库选择:

🐘 PostgreSQL
CREATE TABLE IF NOT EXISTS outbox (
    id UUID PRIMARY KEY,
    created_at TIMESTAMP WITH TIME ZONE NOT NULL,
    scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL,
    metadata BYTEA,
    payload BYTEA NOT NULL,
    times_attempted INTEGER NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_outbox_created_at ON outbox (created_at);
CREATE INDEX IF NOT EXISTS idx_outbox_scheduled_at ON outbox (scheduled_at);
📊 MySQL
CREATE TABLE IF NOT EXISTS outbox (
    id BINARY(16) PRIMARY KEY,
    created_at TIMESTAMP(3) NOT NULL,
    scheduled_at TIMESTAMP(3) NOT NULL,
    metadata BLOB,
    payload BLOB NOT NULL,
    times_attempted INT NOT NULL
);

CREATE INDEX idx_outbox_created_at ON outbox (created_at);
CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);

示例

完整的工作示例:

  • Postgres & Kafka
  • Oracle & NATS
  • MySQL & RabbitMQ

运行示例:

cd examples/postgres-kafka # 或examples/oracle-nats或examples/mysql-rabitmq
../../scripts/up-and-wait.sh
go run service.go

# 在另一个终端触发POST请求创建实体
curl -X POST http://localhost:8080/entity

常见问题

当我的服务的多个实例使用该库时会发生什么?

当运行多个服务实例时,每个实例都有自己的reader,请注意:

  • 多个reader会独立检索消息,可能导致消息被多次发布。处理方式:
    1. 确保消费者是幂等的并接受重复
    2. 如果可用,使用代理的重复数据删除功能
    3. 仅在单个实例中运行reader

乐观发布功能可以显著减少重复数量。

使用pgxpool时如何实例化DBContext?

可以使用stdlib.OpenDBFromPool函数从*pgxpool.Pool获取*sql.DB

import (
    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/jackc/pgx/v5/stdlib"
    "github.com/oagudo/outbox"
)

// ...
pool, _ := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
db := stdlib.OpenDBFromPool(pool)
dbCtx := outbox.NewDBContext(db, outbox.SQLDialectPostgres)

更多关于golang轻量级事务性发件箱模式实现插件库outbox的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang轻量级事务性发件箱模式实现插件库outbox的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang轻量级事务性发件箱模式实现

发件箱模式(Outbox Pattern)是一种解决分布式事务问题的常用模式,它通过在本地数据库事务中记录待发送的消息,然后由后台进程异步发送这些消息,确保本地操作和消息发送的最终一致性。

发件箱模式核心思想

  1. 将消息作为业务数据的一部分,在同一个事务中写入本地数据库
  2. 后台进程定期扫描未发送的消息并投递到消息队列
  3. 消息成功投递后标记为已发送

轻量级实现方案

以下是使用Golang实现的轻量级发件箱模式核心代码:

package outbox

import (
	"context"
	"database/sql"
	"encoding/json"
	"time"
)

// Message 发件箱消息结构
type Message struct {
	ID        int64           `json:"id"`
	Topic     string          `json:"topic"`
	Key       string          `json:"key"`
	Payload   json.RawMessage `json:"payload"`
	CreatedAt time.Time       `json:"created_at"`
	SentAt    *time.Time      `json:"sent_at"`
}

// Store 发件箱存储接口
type Store interface {
	AddMessage(ctx context.Context, tx *sql.Tx, topic, key string, payload interface{}) error
	GetUnsentMessages(ctx context.Context, batchSize int) ([]Message, error)
	MarkAsSent(ctx context.Context, ids []int64) error
}

// Publisher 消息发布接口
type Publisher interface {
	Publish(ctx context.Context, topic, key string, payload []byte) error
}

// Outbox 发件箱主结构
type Outbox struct {
	store     Store
	publisher Publisher
	batchSize int
}

func NewOutbox(store Store, publisher Publisher, batchSize int) *Outbox {
	return &Outbox{
		store:     store,
		publisher: publisher,
		batchSize: batchSize,
	}
}

// AddMessage 在事务中添加消息
func (o *Outbox) AddMessage(ctx context.Context, tx *sql.Tx, topic, key string, payload interface{}) error {
	return o.store.AddMessage(ctx, tx, topic, key, payload)
}

// ProcessOutbox 处理未发送的消息
func (o *Outbox) ProcessOutbox(ctx context.Context) error {
	messages, err := o.store.GetUnsentMessages(ctx, o.batchSize)
	if err != nil {
		return err
	}

	var ids []int64
	for _, msg := range messages {
		if err := o.publisher.Publish(ctx, msg.Topic, msg.Key, msg.Payload); err != nil {
			return err
		}
		ids = append(ids, msg.ID)
	}

	if len(ids) > 0 {
		return o.store.MarkAsSent(ctx, ids)
	}

	return nil
}

使用示例

1. 初始化发件箱

// 实现Store和Publisher接口
type PostgresStore struct {
	db *sql.DB
}

type KafkaPublisher struct {
	producer sarama.SyncProducer
}

// 初始化
store := &PostgresStore{db: db}
publisher := &KafkaPublisher{producer: kafkaProducer}
outbox := outbox.NewOutbox(store, publisher, 100)

// 后台处理goroutine
go func() {
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			if err := outbox.ProcessOutbox(context.Background()); err != nil {
				log.Printf("Process outbox error: %v", err)
			}
		case <-ctx.Done():
			return
		}
	}
}()

2. 在业务事务中使用

func CreateOrder(ctx context.Context, order Order) error {
	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		return err
	}
	defer func() {
		if err != nil {
			tx.Rollback()
		}
	}()

	// 保存订单
	if err := saveOrder(tx, order); err != nil {
		return err
	}
	
	// 添加发件箱消息
	event := OrderCreatedEvent{
		OrderID: order.ID,
		Amount:  order.Amount,
	}
	if err := outbox.AddMessage(ctx, tx, "orders", order.ID, event); err != nil {
		return err
	}

	return tx.Commit()
}

实现建议

  1. 存储实现:可以使用PostgreSQL、MySQL等关系型数据库实现Store接口
  2. 消息去重:为消息添加唯一ID或使用幂等性处理
  3. 错误处理:实现重试机制和死信队列处理无法发送的消息
  4. 性能优化:批量处理消息,减少数据库和消息队列的IO操作
  5. 监控:添加指标监控未处理消息数量和延迟

现有库推荐

如果需要更完整的解决方案,可以考虑以下开源库:

  1. watermill-outbox: https://github.com/ThreeDotsLabs/watermill-outbox
  2. go-outbox: https://github.com/alextanhongpin/go-outbox
  3. eventhorizon: https://github.com/looplab/eventhorizon

这些库提供了更完善的功能,如Exactly-Once投递、多种存储后端支持等。

发件箱模式是处理分布式事务的实用模式,通过将消息存储与业务数据放在同一事务中,确保了数据一致性,同时异步处理机制保证了系统性能。

回到顶部