golang实现Outbox模式的分布式事务处理插件库outboxer的使用
Golang实现Outbox模式的分布式事务处理插件库outboxer的使用
Outboxer简介
Outboxer是一个Go语言库,实现了Outbox模式(发件箱模式)。它简化了消息可靠性编排的复杂工作,主要解决以下问题:
当消息代理/消费者不可用时,生产者如何可靠地发送消息?
如果您有一个分布式系统架构,特别是主要处理事件驱动架构,您可能会想要使用outboxer。
快速开始
首先需要在项目中引入outboxer包:
go get github.com/italolelis/outboxer
使用示例
下面是一个使用Google PubSub和PostgreSQL作为Outbox模式组件的完整示例:
package main
import (
"context"
"database/sql"
"fmt"
"os"
"time"
"github.com/italolelis/outboxer"
"github.com/italolelis/outboxer/es/amqp"
"github.com/italolelis/outboxer/es/pubsub"
"github.com/italolelis/outboxer/storage/postgres"
"google.golang.org/api/option"
)
func main() {
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 连接PostgreSQL数据库
db, err := sql.Open("postgres", os.Getenv("DS_DSN"))
if err != nil {
fmt.Printf("could not connect to postgres: %s", err)
return
}
defer db.Close()
// 创建数据存储实例
ds, err := postgres.WithInstance(ctx, db)
if err != nil {
fmt.Printf("could not setup the data store: %s", err)
return
}
defer ds.Close()
// 创建Google PubSub客户端
client, err := pubsub.NewClient(ctx, os.Getenv("GCP_PROJECT_ID"))
if err != nil {
fmt.Printf("failed to connect to gcp: %s", err)
return
}
// 创建事件流
es := pubsubOut.New(client)
// 创建Outboxer实例
o, err := outboxer.New(
outboxer.WithDataStore(ds),
outboxer.WithEventStream(es),
outboxer.WithCheckInterval(1*time.Second),
outboxer.WithCleanupInterval(5*time.Second),
outboxer.WithCleanUpBefore(time.Now().AddDate(0, 0, -5)),
)
if err != nil {
fmt.Printf("could not create an outboxer instance: %s", err)
return
}
// 启动Outboxer检查和清理goroutine
o.Start(ctx)
defer o.Stop()
// 发送消息
if err = o.Send(ctx, &outboxer.OutboxMessage{
Payload: []byte("test payload"),
Options: map[string]interface{}{
amqpOut.ExchangeNameOption: "test",
amqpOut.ExchangeTypeOption: "topic",
amqpOut.RoutingKeyOption: "test.send",
},
}); err != nil {
fmt.Printf("could not send message: %s", err)
return
}
// 监听错误和成功发送的消息
for {
select {
case err := <-o.ErrChan():
fmt.Printf("could not send message: %s", err)
case <-o.OkChan():
fmt.Printf("message received")
return
}
}
}
功能特性
Outboxer提供了多种数据存储和事件流的实现:
数据存储
- Postgres DataStore
- MySQL DataStore
- SQLServer DataStore
事件流
- AMQP EventStream
- Kinesis EventStream
- SQS EventStream
- GCP PubSub
工作原理
- 初始化:创建数据存储和事件流实例
- 配置:设置检查间隔、清理间隔等参数
- 启动:开始后台goroutine定期检查未发送的消息
- 发送:将消息存入数据库并标记为待发送
- 处理:后台goroutine将待发送消息推送到事件流
- 确认:监听发送结果(成功或失败)
注意事项
- 确保数据库连接和事件流连接正确配置
- 根据业务需求调整检查间隔和清理间隔
- 消息发送是异步的,Send方法只是将消息存入数据库
- 需要处理发送失败的情况
这个示例展示了如何使用outboxer库实现可靠的分布式消息传递,确保即使在消息代理不可用时,消息也不会丢失,而是会在代理恢复后自动发送。
更多关于golang实现Outbox模式的分布式事务处理插件库outboxer的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现Outbox模式的分布式事务处理插件库outboxer的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang实现Outbox模式的分布式事务处理插件库outboxer
Outbox模式是一种解决分布式事务问题的常用模式,它通过在本地事务中将需要发送的消息写入数据库表(outbox表),然后由单独的进程将这些消息转发到消息队列,从而保证本地事务和消息发送的原子性。
outboxer库简介
outboxer是一个Golang实现的Outbox模式库,它提供了以下核心功能:
- 将业务数据和消息写入同一个本地事务
- 异步将消息投递到消息队列
- 保证消息至少投递一次
- 提供重试机制
安装outboxer
go get github.com/italolelis/outboxer
基本使用示例
1. 初始化outboxer
package main
import (
"context"
"database/sql"
"log"
"time"
"github.com/italolelis/outboxer"
_ "github.com/lib/pq"
)
func main() {
// 初始化数据库连接
db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 创建outboxer实例
ctx := context.Background()
ob := outboxer.New(
outboxer.WithDataStoreProvider(outboxer.NewPostgresProvider(db)),
outboxer.WithEventStream(outboxer.NewAMQPStream("amqp://guest:guest@localhost:5672/")),
outboxer.WithCheckInterval(1*time.Second),
outboxer.WithCleanupInterval(24*time.Hour),
outboxer.WithCleanUpBefore(time.Now().Add(-240*time.Hour)),
)
// 启动outboxer
go func() {
if err := ob.Start(ctx); err != nil {
log.Fatal(err)
}
}()
}
2. 在事务中使用outboxer
func createOrder(ctx context.Context, db *sql.DB, ob *outboxer.Outboxer, order Order) error {
// 开始事务
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
}
}()
// 执行业务操作 - 插入订单
_, err = tx.ExecContext(ctx,
"INSERT INTO orders(id, customer_id, amount) VALUES($1, $2, $3)",
order.ID, order.CustomerID, order.Amount)
if err != nil {
return err
}
// 准备要发送的事件消息
event := outboxer.OutboxMessage{
Payload: []byte(`{"order_id": "` + order.ID + `", "amount": ` + strconv.Itoa(order.Amount) + `}`),
Options: map[string]interface{}{
"exchange": "orders",
"routingKey": "order.created",
},
}
// 将事件写入outbox表(与业务数据在同一个事务中)
if err := ob.Send(ctx, tx, &event); err != nil {
return err
}
// 提交事务
return tx.Commit()
}
3. 自定义消息处理器
type CustomHandler struct{}
func (h *CustomHandler) Handle(ctx context.Context, evt *outboxer.OutboxMessage) error {
log.Printf("processing message with id %s and payload %s", evt.ID, string(evt.Payload))
// 这里可以添加自定义处理逻辑
// 比如根据消息内容决定发送到哪个队列
return nil
}
func main() {
// ... 初始化代码
ob := outboxer.New(
outboxer.WithDataStoreProvider(outboxer.NewPostgresProvider(db)),
outboxer.WithEventStream(&CustomHandler{}),
// ... 其他选项
)
// ... 启动代码
}
高级配置
重试策略
ob := outboxer.New(
outboxer.WithDataStoreProvider(outboxer.NewPostgresProvider(db)),
outboxer.WithEventStream(outboxer.NewAMQPStream("amqp://guest:guest@localhost:5672/")),
outboxer.WithMaxRetry(5), // 最大重试次数
outboxer.WithRetryInterval(5*time.Second), // 重试间隔
)
清理策略
ob := outboxer.New(
// ... 其他配置
outboxer.WithCleanupInterval(24*time.Hour), // 每24小时清理一次
outboxer.WithCleanUpBefore(time.Now().Add(-240*time.Hour)), // 清理240小时前的记录
)
最佳实践
- 消息幂等性:确保消息处理是幂等的,因为outboxer保证至少投递一次
- 消息大小:避免发送过大的消息,考虑只发送事件ID然后在消费者端查询详情
- 错误处理:实现适当的错误处理和监控
- 性能考虑:对于高吞吐场景,调整批处理大小和检查间隔
总结
outboxer库为Golang应用提供了一种简单可靠的方式来实现Outbox模式,解决了分布式事务中的消息可靠投递问题。通过将消息写入本地数据库事务,然后异步投递到消息队列,它既保证了数据一致性,又提供了良好的系统性能。
在实际应用中,你可以根据具体需求扩展或自定义outboxer的行为,比如添加特定的消息序列化方式、实现更复杂的路由逻辑等。