golang实现RabbitMQ自动重连与发布重试的轻量级插件库rabbitroutine的使用

Golang实现RabbitMQ自动重连与发布重试的轻量级插件库rabbitroutine的使用

概述

rabbitroutine是一个轻量级的Golang库,它为您处理RabbitMQ的自动重连和发布重试逻辑。该库旨在解决开发人员在使用RabbitMQ时遇到的连接问题。

RabbitMQ Failover Routine

功能特性

  • 分别处理连接错误和通道错误
  • 考虑在RabbitMQ重连后需要重新声明实体
  • 提供错误通知和连接重试尝试通知
  • 支持FireAndForgetPublisher和EnsurePublisher,可以使用RetryPublisher进行包装
  • 支持用于发布的通道池
  • 提供通道池大小统计

安装

go get github.com/furdarius/rabbitroutine

使用示例

消费者示例

// Consumer 声明您自己的RabbitMQ消费者,实现rabbitroutine.Consumer接口
type Consumer struct {}
func (c *Consumer) Declare(ctx context.Context, ch *amqp.Channel) error {
    // 在这里声明队列、交换机和绑定
    _, err := ch.QueueDeclare(
        "myqueue", // 队列名称
        true,      // 是否持久化
        false,     // 是否自动删除
        false,     // 是否排他
        false,     // 是否等待
        nil,       // 参数
    )
    return err
}
func (c *Consumer) Consume(ctx context.Context, ch *amqp.Channel) error {
    // 在这里开始消费消息
    msgs, err := ch.Consume(
        "myqueue", // 队列名称
        "",        // 消费者标签
        false,     // 是否自动确认
        false,     // 是否排他
        false,     // 是否不等待
        false,     // 是否不本地
        nil,       // 参数
    )
    if err != nil {
        return err
    }

    for {
        select {
        case msg := <-msgs:
            // 处理消息
            log.Printf("Received a message: %s", msg.Body)
            msg.Ack(false)
        case <-ctx.Done():
            return nil
        }
    }
}

url := "amqp://guest:guest@127.0.0.1:5672/"

conn := rabbitroutine.NewConnector(rabbitroutine.Config{
    // 重连等待时间
    Wait: 2 * time.Second,
})

ctx := context.Background()

go func() {
    err := conn.Dial(ctx, url)
    if err != nil {
        log.Println(err)
    }
}()

consumer := &Consumer{}
go func() {
    err := conn.StartConsumer(ctx, consumer)
    if err != nil {
        log.Println(err)
    }
}()

发布者示例

ctx := context.Background()

url := "amqp://guest:guest@127.0.0.1:5672/"

conn := rabbitroutine.NewConnector(rabbitroutine.Config{
    // 重连等待时间
    Wait: 2 * time.Second,
})

// 创建通道池
pool := rabbitroutine.NewPool(conn)
// 创建确保发布的发布者
ensurePub := rabbitroutine.NewEnsurePublisher(pool)
// 使用重试机制包装发布者
pub := rabbitroutine.NewRetryPublisher(
    ensurePub,
    rabbitroutine.PublishMaxAttemptsSetup(16),          // 最大重试次数
    rabbitroutine.PublishDelaySetup(rabbitroutine.LinearDelay(10*time.Millisecond)), // 重试延迟
)

// 启动连接
go conn.Dial(ctx, url)

// 发布消息
err := pub.Publish(ctx, "myexch", "myqueue", amqp.Publishing{
    Body: []byte("message"),
})
if err != nil {
    log.Println("publish error:", err)
}

贡献指南

欢迎提交Pull Request。创建您的Pull Request时,请确保包含覆盖您更改的测试或示例,并且您的提交代表连贯的更改,包括更改原因。

要运行集成测试,请确保您在任何主机上运行RabbitMQ:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management

然后导出环境变量AMQP_URL=amqp://host/并运行go test -tags integration

AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ go test -v -race -cpu=1,2 -tags integration -timeout 5s

使用golangci-lint检查代码:

golangci-lint run ./...

更多关于golang实现RabbitMQ自动重连与发布重试的轻量级插件库rabbitroutine的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现RabbitMQ自动重连与发布重试的轻量级插件库rabbitroutine的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


RabbitRoutine - Golang RabbitMQ 自动重连与发布重试库

RabbitRoutine 是一个轻量级的 Golang 库,用于简化 RabbitMQ 连接管理和消息发布重试机制。下面我将介绍如何使用它来实现可靠的 RabbitMQ 消息传递。

核心特性

  1. 自动连接管理(断线重连)
  2. 消息发布重试机制
  3. 轻量级设计,易于集成
  4. 支持消息确认模式

安装

go get github.com/yourusername/rabbitroutine

基本用法

1. 初始化连接

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/yourusername/rabbitroutine"
	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	// 配置连接参数
	config := rabbitroutine.Config{
		URL:           "amqp://guest:guest@localhost:5672/",
		RetryInterval: 5 * time.Second, // 重试间隔
		MaxRetries:    3,               // 最大重试次数
	}

	// 创建连接管理器
	connManager, err := rabbitroutine.NewConnectionManager(config)
	if err != nil {
		log.Fatalf("Failed to create connection manager: %v", err)
	}

	// 启动连接
	go connManager.Start()

	// 等待连接就绪
	<-connManager.Ready()
	fmt.Println("RabbitMQ connection established")
}

2. 发布消息

func publishMessage(connManager *rabbitroutine.ConnectionManager) {
	// 获取通道
	channel, err := connManager.Channel()
	if err != nil {
		log.Printf("Failed to get channel: %v", err)
		return
	}
	defer channel.Close()

	// 声明队列
	_, err = channel.QueueDeclare(
		"test_queue", // 队列名
		true,         // 持久化
		false,        // 自动删除
		false,        // 排他
		false,        // 无等待
		nil,          // 参数
	)
	if err != nil {
		log.Printf("Failed to declare queue: %v", err)
		return
	}

	// 发布消息
	err = connManager.PublishWithRetry(
		"",            // 交换机
		"test_queue",  // 路由键
		false,         // 强制
		false,         // 立即
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte("Hello RabbitMQ!"),
		},
	)
	if err != nil {
		log.Printf("Failed to publish message: %v", err)
	} else {
		log.Println("Message published successfully")
	}
}

3. 消费消息

func consumeMessages(connManager *rabbitroutine.ConnectionManager) {
	// 获取通道
	channel, err := connManager.Channel()
	if err != nil {
		log.Printf("Failed to get channel: %v", err)
		return
	}
	defer channel.Close()

	// 声明队列
	queue, err := channel.QueueDeclare(
		"test_queue", // 队列名
		true,         // 持久化
		false,        // 自动删除
		false,        // 排他
		false,        // 无等待
		nil,          // 参数
	)
	if err != nil {
		log.Printf("Failed to declare queue: %v", err)
		return
	}

	// 消费消息
	msgs, err := channel.Consume(
		queue.Name, // 队列
		"",         // 消费者标签
		false,      // 自动确认
		false,      // 排他
		false,      // 无本地
		false,      // 无等待
		nil,        // 参数
	)
	if err != nil {
		log.Printf("Failed to register consumer: %v", err)
		return
	}

	// 处理消息
	go func() {
		for msg := range msgs {
			log.Printf("Received a message: %s", msg.Body)
			// 手动确认消息
			msg.Ack(false)
		}
	}()
}

高级功能

自定义重试策略

config := rabbitroutine.Config{
	URL: "amqp://guest:guest@localhost:5672/",
	RetryPolicy: func(attempt int) time.Duration {
		// 指数退避策略
		return time.Duration(math.Pow(2, float64(attempt))) * time.Second
	},
	MaxRetries: 5,
}

连接状态监控

// 监听连接状态变化
go func() {
	for status := range connManager.Status() {
		switch status {
		case rabbitroutine.Connected:
			log.Println("Connected to RabbitMQ")
		case rabbitroutine.Disconnected:
			log.Println("Disconnected from RabbitMQ")
		case rabbitroutine.Reconnecting:
			log.Println("Reconnecting to RabbitMQ...")
		}
	}
}()

发布确认模式

// 启用发布确认
channel, err := connManager.Channel()
if err != nil {
	return err
}
defer channel.Close()

err = channel.Confirm(false) // 非阻塞确认模式
if err != nil {
	return err
}

confirm := channel.NotifyPublish(make(chan amqp.Confirmation, 1))

// 发布消息
err = channel.Publish(...)
if err != nil {
	return err
}

// 等待确认
if confirmed := <-confirm; !confirmed.Ack {
	return fmt.Errorf("message not acknowledged by broker")
}

最佳实践

  1. 对于关键消息,总是启用持久化队列和消息
  2. 使用发布确认模式确保消息到达broker
  3. 实现消费者幂等性处理
  4. 监控连接状态和重试次数
  5. 合理设置重试间隔避免雪崩效应

RabbitRoutine 通过封装常见的连接管理和重试逻辑,使得在 Golang 中使用 RabbitMQ 更加可靠和简单。你可以根据项目需求进一步扩展它的功能。

回到顶部