golang实现Outbox模式的分布式事务处理插件库outboxer的使用

Golang实现Outbox模式的分布式事务处理插件库outboxer的使用

Outboxer简介

Outboxer是一个Go语言库,实现了Outbox模式(发件箱模式)。它简化了消息可靠性编排的复杂工作,主要解决以下问题:

当消息代理/消费者不可用时,生产者如何可靠地发送消息?

如果您有一个分布式系统架构,特别是主要处理事件驱动架构,您可能会想要使用outboxer。

快速开始

首先需要在项目中引入outboxer包:

go get github.com/italolelis/outboxer

使用示例

下面是一个使用Google PubSub和PostgreSQL作为Outbox模式组件的完整示例:

package main

import (
    "context"
    "database/sql"
    "fmt"
    "os"
    "time"

    "github.com/italolelis/outboxer"
    "github.com/italolelis/outboxer/es/amqp"
    "github.com/italolelis/outboxer/es/pubsub"
    "github.com/italolelis/outboxer/storage/postgres"
    "google.golang.org/api/option"
)

func main() {
    // 创建上下文
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 连接PostgreSQL数据库
    db, err := sql.Open("postgres", os.Getenv("DS_DSN"))
    if err != nil {
        fmt.Printf("could not connect to postgres: %s", err)
        return
    }
    defer db.Close()

    // 创建数据存储实例
    ds, err := postgres.WithInstance(ctx, db)
    if err != nil {
        fmt.Printf("could not setup the data store: %s", err)
        return
    }
    defer ds.Close()

    // 创建Google PubSub客户端
    client, err := pubsub.NewClient(ctx, os.Getenv("GCP_PROJECT_ID"))
    if err != nil {
        fmt.Printf("failed to connect to gcp: %s", err)
        return
    }

    // 创建事件流
    es := pubsubOut.New(client)

    // 创建Outboxer实例
    o, err := outboxer.New(
        outboxer.WithDataStore(ds),
        outboxer.WithEventStream(es),
        outboxer.WithCheckInterval(1*time.Second),
        outboxer.WithCleanupInterval(5*time.Second),
        outboxer.WithCleanUpBefore(time.Now().AddDate(0, 0, -5)),
    )
    if err != nil {
        fmt.Printf("could not create an outboxer instance: %s", err)
        return
    }

    // 启动Outboxer检查和清理goroutine
    o.Start(ctx)
    defer o.Stop()

    // 发送消息
    if err = o.Send(ctx, &outboxer.OutboxMessage{
        Payload: []byte("test payload"),
        Options: map[string]interface{}{
            amqpOut.ExchangeNameOption: "test",
            amqpOut.ExchangeTypeOption: "topic",
            amqpOut.RoutingKeyOption:   "test.send",
        },
    }); err != nil {
        fmt.Printf("could not send message: %s", err)
        return
    }

    // 监听错误和成功发送的消息
    for {
        select {
        case err := <-o.ErrChan():
            fmt.Printf("could not send message: %s", err)
        case <-o.OkChan():
            fmt.Printf("message received")
            return
        }
    }
}

功能特性

Outboxer提供了多种数据存储和事件流的实现:

数据存储

  • Postgres DataStore
  • MySQL DataStore
  • SQLServer DataStore

事件流

  • AMQP EventStream
  • Kinesis EventStream
  • SQS EventStream
  • GCP PubSub

工作原理

  1. 初始化:创建数据存储和事件流实例
  2. 配置:设置检查间隔、清理间隔等参数
  3. 启动:开始后台goroutine定期检查未发送的消息
  4. 发送:将消息存入数据库并标记为待发送
  5. 处理:后台goroutine将待发送消息推送到事件流
  6. 确认:监听发送结果(成功或失败)

注意事项

  1. 确保数据库连接和事件流连接正确配置
  2. 根据业务需求调整检查间隔和清理间隔
  3. 消息发送是异步的,Send方法只是将消息存入数据库
  4. 需要处理发送失败的情况

这个示例展示了如何使用outboxer库实现可靠的分布式消息传递,确保即使在消息代理不可用时,消息也不会丢失,而是会在代理恢复后自动发送。


更多关于golang实现Outbox模式的分布式事务处理插件库outboxer的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现Outbox模式的分布式事务处理插件库outboxer的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang实现Outbox模式的分布式事务处理插件库outboxer

Outbox模式是一种解决分布式事务问题的常用模式,它通过在本地事务中将需要发送的消息写入数据库表(outbox表),然后由单独的进程将这些消息转发到消息队列,从而保证本地事务和消息发送的原子性。

outboxer库简介

outboxer是一个Golang实现的Outbox模式库,它提供了以下核心功能:

  1. 将业务数据和消息写入同一个本地事务
  2. 异步将消息投递到消息队列
  3. 保证消息至少投递一次
  4. 提供重试机制

安装outboxer

go get github.com/italolelis/outboxer

基本使用示例

1. 初始化outboxer

package main

import (
	"context"
	"database/sql"
	"log"
	"time"

	"github.com/italolelis/outboxer"
	_ "github.com/lib/pq"
)

func main() {
	// 初始化数据库连接
	db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// 创建outboxer实例
	ctx := context.Background()
	ob := outboxer.New(
		outboxer.WithDataStoreProvider(outboxer.NewPostgresProvider(db)),
		outboxer.WithEventStream(outboxer.NewAMQPStream("amqp://guest:guest@localhost:5672/")),
		outboxer.WithCheckInterval(1*time.Second),
		outboxer.WithCleanupInterval(24*time.Hour),
		outboxer.WithCleanUpBefore(time.Now().Add(-240*time.Hour)),
	)

	// 启动outboxer
	go func() {
		if err := ob.Start(ctx); err != nil {
			log.Fatal(err)
		}
	}()
}

2. 在事务中使用outboxer

func createOrder(ctx context.Context, db *sql.DB, ob *outboxer.Outboxer, order Order) error {
    // 开始事务
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer func() {
        if err != nil {
            tx.Rollback()
        }
    }()

    // 执行业务操作 - 插入订单
    _, err = tx.ExecContext(ctx, 
        "INSERT INTO orders(id, customer_id, amount) VALUES($1, $2, $3)",
        order.ID, order.CustomerID, order.Amount)
    if err != nil {
        return err
    }

    // 准备要发送的事件消息
    event := outboxer.OutboxMessage{
        Payload: []byte(`{"order_id": "` + order.ID + `", "amount": ` + strconv.Itoa(order.Amount) + `}`),
        Options: map[string]interface{}{
            "exchange":   "orders",
            "routingKey": "order.created",
        },
    }

    // 将事件写入outbox表(与业务数据在同一个事务中)
    if err := ob.Send(ctx, tx, &event); err != nil {
        return err
    }

    // 提交事务
    return tx.Commit()
}

3. 自定义消息处理器

type CustomHandler struct{}

func (h *CustomHandler) Handle(ctx context.Context, evt *outboxer.OutboxMessage) error {
    log.Printf("processing message with id %s and payload %s", evt.ID, string(evt.Payload))
    
    // 这里可以添加自定义处理逻辑
    // 比如根据消息内容决定发送到哪个队列
    
    return nil
}

func main() {
    // ... 初始化代码
    
    ob := outboxer.New(
        outboxer.WithDataStoreProvider(outboxer.NewPostgresProvider(db)),
        outboxer.WithEventStream(&CustomHandler{}),
        // ... 其他选项
    )
    
    // ... 启动代码
}

高级配置

重试策略

ob := outboxer.New(
    outboxer.WithDataStoreProvider(outboxer.NewPostgresProvider(db)),
    outboxer.WithEventStream(outboxer.NewAMQPStream("amqp://guest:guest@localhost:5672/")),
    outboxer.WithMaxRetry(5), // 最大重试次数
    outboxer.WithRetryInterval(5*time.Second), // 重试间隔
)

清理策略

ob := outboxer.New(
    // ... 其他配置
    outboxer.WithCleanupInterval(24*time.Hour), // 每24小时清理一次
    outboxer.WithCleanUpBefore(time.Now().Add(-240*time.Hour)), // 清理240小时前的记录
)

最佳实践

  1. 消息幂等性:确保消息处理是幂等的,因为outboxer保证至少投递一次
  2. 消息大小:避免发送过大的消息,考虑只发送事件ID然后在消费者端查询详情
  3. 错误处理:实现适当的错误处理和监控
  4. 性能考虑:对于高吞吐场景,调整批处理大小和检查间隔

总结

outboxer库为Golang应用提供了一种简单可靠的方式来实现Outbox模式,解决了分布式事务中的消息可靠投递问题。通过将消息写入本地数据库事务,然后异步投递到消息队列,它既保证了数据一致性,又提供了良好的系统性能。

在实际应用中,你可以根据具体需求扩展或自定义outboxer的行为,比如添加特定的消息序列化方式、实现更复杂的路由逻辑等。

回到顶部