golang实现RabbitMQ自动重连与发布重试的轻量级插件库rabbitroutine的使用
Golang实现RabbitMQ自动重连与发布重试的轻量级插件库rabbitroutine的使用
概述
rabbitroutine是一个轻量级的Golang库,它为您处理RabbitMQ的自动重连和发布重试逻辑。该库旨在解决开发人员在使用RabbitMQ时遇到的连接问题。
功能特性
- 分别处理连接错误和通道错误
- 考虑在RabbitMQ重连后需要重新声明实体
- 提供错误通知和连接重试尝试通知
- 支持FireAndForgetPublisher和EnsurePublisher,可以使用RetryPublisher进行包装
- 支持用于发布的通道池
- 提供通道池大小统计
安装
go get github.com/furdarius/rabbitroutine
使用示例
消费者示例
// Consumer 声明您自己的RabbitMQ消费者,实现rabbitroutine.Consumer接口
type Consumer struct {}
func (c *Consumer) Declare(ctx context.Context, ch *amqp.Channel) error {
// 在这里声明队列、交换机和绑定
_, err := ch.QueueDeclare(
"myqueue", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
nil, // 参数
)
return err
}
func (c *Consumer) Consume(ctx context.Context, ch *amqp.Channel) error {
// 在这里开始消费消息
msgs, err := ch.Consume(
"myqueue", // 队列名称
"", // 消费者标签
false, // 是否自动确认
false, // 是否排他
false, // 是否不等待
false, // 是否不本地
nil, // 参数
)
if err != nil {
return err
}
for {
select {
case msg := <-msgs:
// 处理消息
log.Printf("Received a message: %s", msg.Body)
msg.Ack(false)
case <-ctx.Done():
return nil
}
}
}
url := "amqp://guest:guest@127.0.0.1:5672/"
conn := rabbitroutine.NewConnector(rabbitroutine.Config{
// 重连等待时间
Wait: 2 * time.Second,
})
ctx := context.Background()
go func() {
err := conn.Dial(ctx, url)
if err != nil {
log.Println(err)
}
}()
consumer := &Consumer{}
go func() {
err := conn.StartConsumer(ctx, consumer)
if err != nil {
log.Println(err)
}
}()
发布者示例
ctx := context.Background()
url := "amqp://guest:guest@127.0.0.1:5672/"
conn := rabbitroutine.NewConnector(rabbitroutine.Config{
// 重连等待时间
Wait: 2 * time.Second,
})
// 创建通道池
pool := rabbitroutine.NewPool(conn)
// 创建确保发布的发布者
ensurePub := rabbitroutine.NewEnsurePublisher(pool)
// 使用重试机制包装发布者
pub := rabbitroutine.NewRetryPublisher(
ensurePub,
rabbitroutine.PublishMaxAttemptsSetup(16), // 最大重试次数
rabbitroutine.PublishDelaySetup(rabbitroutine.LinearDelay(10*time.Millisecond)), // 重试延迟
)
// 启动连接
go conn.Dial(ctx, url)
// 发布消息
err := pub.Publish(ctx, "myexch", "myqueue", amqp.Publishing{
Body: []byte("message"),
})
if err != nil {
log.Println("publish error:", err)
}
贡献指南
欢迎提交Pull Request。创建您的Pull Request时,请确保包含覆盖您更改的测试或示例,并且您的提交代表连贯的更改,包括更改原因。
要运行集成测试,请确保您在任何主机上运行RabbitMQ:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
然后导出环境变量AMQP_URL=amqp://host/
并运行go test -tags integration
。
AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ go test -v -race -cpu=1,2 -tags integration -timeout 5s
使用golangci-lint检查代码:
golangci-lint run ./...
更多关于golang实现RabbitMQ自动重连与发布重试的轻量级插件库rabbitroutine的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang实现RabbitMQ自动重连与发布重试的轻量级插件库rabbitroutine的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
RabbitRoutine - Golang RabbitMQ 自动重连与发布重试库
RabbitRoutine 是一个轻量级的 Golang 库,用于简化 RabbitMQ 连接管理和消息发布重试机制。下面我将介绍如何使用它来实现可靠的 RabbitMQ 消息传递。
核心特性
- 自动连接管理(断线重连)
- 消息发布重试机制
- 轻量级设计,易于集成
- 支持消息确认模式
安装
go get github.com/yourusername/rabbitroutine
基本用法
1. 初始化连接
package main
import (
"fmt"
"log"
"time"
"github.com/yourusername/rabbitroutine"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// 配置连接参数
config := rabbitroutine.Config{
URL: "amqp://guest:guest@localhost:5672/",
RetryInterval: 5 * time.Second, // 重试间隔
MaxRetries: 3, // 最大重试次数
}
// 创建连接管理器
connManager, err := rabbitroutine.NewConnectionManager(config)
if err != nil {
log.Fatalf("Failed to create connection manager: %v", err)
}
// 启动连接
go connManager.Start()
// 等待连接就绪
<-connManager.Ready()
fmt.Println("RabbitMQ connection established")
}
2. 发布消息
func publishMessage(connManager *rabbitroutine.ConnectionManager) {
// 获取通道
channel, err := connManager.Channel()
if err != nil {
log.Printf("Failed to get channel: %v", err)
return
}
defer channel.Close()
// 声明队列
_, err = channel.QueueDeclare(
"test_queue", // 队列名
true, // 持久化
false, // 自动删除
false, // 排他
false, // 无等待
nil, // 参数
)
if err != nil {
log.Printf("Failed to declare queue: %v", err)
return
}
// 发布消息
err = connManager.PublishWithRetry(
"", // 交换机
"test_queue", // 路由键
false, // 强制
false, // 立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello RabbitMQ!"),
},
)
if err != nil {
log.Printf("Failed to publish message: %v", err)
} else {
log.Println("Message published successfully")
}
}
3. 消费消息
func consumeMessages(connManager *rabbitroutine.ConnectionManager) {
// 获取通道
channel, err := connManager.Channel()
if err != nil {
log.Printf("Failed to get channel: %v", err)
return
}
defer channel.Close()
// 声明队列
queue, err := channel.QueueDeclare(
"test_queue", // 队列名
true, // 持久化
false, // 自动删除
false, // 排他
false, // 无等待
nil, // 参数
)
if err != nil {
log.Printf("Failed to declare queue: %v", err)
return
}
// 消费消息
msgs, err := channel.Consume(
queue.Name, // 队列
"", // 消费者标签
false, // 自动确认
false, // 排他
false, // 无本地
false, // 无等待
nil, // 参数
)
if err != nil {
log.Printf("Failed to register consumer: %v", err)
return
}
// 处理消息
go func() {
for msg := range msgs {
log.Printf("Received a message: %s", msg.Body)
// 手动确认消息
msg.Ack(false)
}
}()
}
高级功能
自定义重试策略
config := rabbitroutine.Config{
URL: "amqp://guest:guest@localhost:5672/",
RetryPolicy: func(attempt int) time.Duration {
// 指数退避策略
return time.Duration(math.Pow(2, float64(attempt))) * time.Second
},
MaxRetries: 5,
}
连接状态监控
// 监听连接状态变化
go func() {
for status := range connManager.Status() {
switch status {
case rabbitroutine.Connected:
log.Println("Connected to RabbitMQ")
case rabbitroutine.Disconnected:
log.Println("Disconnected from RabbitMQ")
case rabbitroutine.Reconnecting:
log.Println("Reconnecting to RabbitMQ...")
}
}
}()
发布确认模式
// 启用发布确认
channel, err := connManager.Channel()
if err != nil {
return err
}
defer channel.Close()
err = channel.Confirm(false) // 非阻塞确认模式
if err != nil {
return err
}
confirm := channel.NotifyPublish(make(chan amqp.Confirmation, 1))
// 发布消息
err = channel.Publish(...)
if err != nil {
return err
}
// 等待确认
if confirmed := <-confirm; !confirmed.Ack {
return fmt.Errorf("message not acknowledged by broker")
}
最佳实践
- 对于关键消息,总是启用持久化队列和消息
- 使用发布确认模式确保消息到达broker
- 实现消费者幂等性处理
- 监控连接状态和重试次数
- 合理设置重试间隔避免雪崩效应
RabbitRoutine 通过封装常见的连接管理和重试逻辑,使得在 Golang 中使用 RabbitMQ 更加可靠和简单。你可以根据项目需求进一步扩展它的功能。