golang声明式配置的RabbitMQ客户端插件库go-mq的使用

golang声明式配置的RabbitMQ客户端插件库go-mq的使用

关于

go-mq是一个Golang库,提供了以声明式方式封装RabbitMQ(AMQP)实体创建和配置的能力,如队列、交换器、生产者和消费者,只需单个配置文件即可完成配置。

交换器、队列和生产者将在后台自动初始化。

go-mq支持同步和异步两种生产者模式。

go-mq具有在连接关闭或网络错误时自动重连的功能,可以通过reconnect_delay选项配置每次重连尝试之间的延迟。

最低Go版本要求

1.16

安装

go get -u github.com/cheshir/go-mq/v2

配置

可以通过mq.Config结构体直接配置,或从配置文件填充配置。

支持的配置标签:

  • yaml
  • json
  • mapstructure

可用配置选项示例:

dsn: "amqp://login:password@host:port/virtual_host" # 集群连接可使用逗号分隔的列表
reconnect_delay: 5s                     # 连接尝试之间的间隔
test_mode: false                        # 切换到使用模拟代理,默认为false

exchanges:
  - name: "exchange_name"
    type: "direct"
    options:
      auto_delete: false
      durable: false
      internal: false
      no_wait: false

queues:
  - name: "queue_name"
    exchange: "exchange_name"
    routing_key: "route"
    binding_options:
      no_wait: false
    options:
      auto_delete: false
      durable: false
      exclusive: false
      no_wait: false

producers:
  - name: "producer_name"
    buffer_size: 10                      # 在发布大量消息时可以缓冲的消息数量
    exchange: "exchange_name"
    routing_key: "route"
    sync: false                          # 指定生产者工作在同步还是异步模式
    options:
      content_type:  "application/json"
      delivery_mode: 2                   # 1-非持久化,2-持久化

consumers:
  - name: "consumer_name"
    queue: "queue_name"
    workers: 1                           # 工作线程数,默认为1
    prefetch_count: 0                    # 每个工作线程预取消息数
    prefetch_size: 0                     # 每个工作线程预取消息大小
    options:
      no_ack: false
      no_local: false
      no_wait: false
      exclusive: false

完整示例

基本使用示例

package main

import (
	"log"

	"github.com/cheshir/go-mq"
)

func main() {
	// 配置RabbitMQ连接和实体
	config := mq.Config{
		DSN: "amqp://guest:guest@localhost:5672/",
		Exchanges: []mq.ExchangeConfig{
			{
				Name: "test_exchange",
				Type: "direct",
			},
		},
		Queues: []mq.QueueConfig{
			{
				Name:       "test_queue",
				Exchange:   "test_exchange",
				RoutingKey: "test_route",
			},
		},
		Producers: []mq.ProducerConfig{
			{
				Name:       "test_producer",
				Exchange:   "test_exchange",
				RoutingKey: "test_route",
			},
		},
		Consumers: []mq.ConsumerConfig{
			{
				Name:    "test_consumer",
				Queue:   "test_queue",
				Workers: 1,
			},
		},
	}

	// 创建MQ实例
	queue, err := mq.New(config)
	if err != nil {
		log.Fatal(err)
	}

	// 处理错误
	go handleErrors(queue.Error())

	// 获取生产者
	producer := queue.AsyncProducer("test_producer")

	// 发布消息
	err = producer.Produce([]byte("test message"))
	if err != nil {
		log.Println("Failed to publish message:", err)
	}

	// 获取消费者
	consumer := queue.Consumer("test_consumer")

	// 启动消费者
	consumer.Start(func(message mq.Message) {
		log.Printf("Received message: %s", message.Body)
		message.Ack(false) // 确认消息
	})

	// 其他业务逻辑...
}

func handleErrors(errors <-chan error) {
	for err := range errors {
		log.Println("MQ error:", err)
	}
}

同步生产者示例

// 配置同步生产者
config := mq.Config{
	// ...其他配置
	Producers: []mq.ProducerConfig{
		{
			Name:       "sync_producer",
			Exchange:   "test_exchange",
			RoutingKey: "test_route",
			Sync:       true, // 设置为同步模式
		},
	},
}

// 获取同步生产者
syncProducer := queue.SyncProducer("sync_producer")

// 发布消息并立即获取结果
err := syncProducer.Produce([]byte("sync message"))
if err != nil {
	log.Println("Failed to publish sync message:", err)
}

错误处理

所有错误都可通过导出的通道访问:

func handleMQErrors(errors <-chan error) {
	for err := range errors {
		log.Println(err)
	}
}

// 在主函数中
go handleMQErrors(queue.Error())

如果通道已满,新的错误将被丢弃。

同步生产者的错误不会通过错误通道访问,因为它们会直接返回。

测试

有些情况只能使用真实代理测试,有些则只能使用模拟代理测试。

如果可以使用真实代理运行测试:

go test -mock-broker=0

否则将使用模拟代理。

从旧版本升级

从v1升级到v2

  • New()返回*MessageQueue而不是接口
  • 最低Go版本更新为1.16

从0.x升级到1.x

  • GetConsumer()方法重命名为Consumer()
  • GetProducer()方法已移除,改用AsyncProducer()SyncProducer()

更多关于golang声明式配置的RabbitMQ客户端插件库go-mq的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang声明式配置的RabbitMQ客户端插件库go-mq的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


go-mq: 声明式配置的RabbitMQ客户端库

go-mq 是一个基于声明式配置的 RabbitMQ 客户端库,它简化了在 Golang 中使用 RabbitMQ 的复杂性,让开发者可以通过配置而非代码来定义队列、交换机和绑定关系。

核心特性

  1. 声明式配置:通过结构体标签定义队列、交换机和绑定
  2. 自动连接管理:自动处理连接断开和重连
  3. 多种消费模式:支持简单消费者、工作队列、发布/订阅等模式
  4. 中间件支持:可插入日志、重试、监控等中间件

安装

go get github.com/wx-up/go-mq

基本使用示例

1. 生产者示例

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/wx-up/go-mq"
)

// 定义消息结构体
type OrderMessage struct {
	OrderID   string `json:"order_id"`
	ProductID int    `json:"product_id"`
	Quantity  int    `json:"quantity"`
}

func main() {
	// 创建RabbitMQ客户端
	client, err := mq.New(
		mq.WithAddress("amqp://guest:guest@localhost:5672/"),
		mq.WithLogger(mq.DefaultLogger),
	)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	// 创建生产者
	producer := client.NewProducer("order_exchange")

	// 发送消息
	msg := OrderMessage{
		OrderID:   "12345",
		ProductID: 1001,
		Quantity:  2,
	}

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	err = producer.Publish(ctx, msg, 
		mq.WithRoutingKey("order.created"),
		mq.WithHeader("retry", "3"),
	)
	if err != nil {
		fmt.Printf("发送消息失败: %v\n", err)
		return
	}

	fmt.Println("消息发送成功")
}

2. 消费者示例

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/wx-up/go-mq"
)

// 定义消息结构体
type OrderMessage struct {
	OrderID   string `json:"order_id"`
	ProductID int    `json:"product_id"`
	Quantity  int    `json:"quantity"`
}

func main() {
	// 创建RabbitMQ客户端
	client, err := mq.New(
		mq.WithAddress("amqp://guest:guest@localhost:5672/"),
		mq.WithLogger(mq.DefaultLogger),
	)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	// 定义消费者配置
	consumerCfg := &mq.ConsumerConfig{
		Exchange: "order_exchange",
		Queue:    "order_queue",
		Routing:  []string{"order.created"},
		Concurrency: 5, // 并发消费者数量
	}

	// 创建消费者
	consumer := client.NewConsumer(consumerCfg)

	// 定义消息处理函数
	handler := func(ctx context.Context, msg *OrderMessage) error {
		fmt.Printf("收到订单消息: %+v\n", msg)
		// 处理订单逻辑...
		return nil
	}

	// 启动消费者
	ctx := context.Background()
	err = consumer.Start(ctx, handler)
	if err != nil {
		fmt.Printf("消费者启动失败: %v\n", err)
		return
	}

	// 保持运行
	select {}
}

高级配置

声明式队列定义

type OrderEvent struct {
	EventType string `json:"event_type"`
	OrderID   string `json:"order_id"`
	
	// 通过结构体标签声明队列属性
	Queue    string `mq:"queue=order_events;durable=true;auto_delete=false"`
	Exchange string `mq:"exchange=order_events;type=topic;durable=true"`
	Routing  string `mq:"routing=order.*"`
}

func main() {
	client, err := mq.New(
		mq.WithAddress("amqp://guest:guest@localhost:5672/"),
		mq.WithDeclare([]mq.Declarer{
			&OrderEvent{}, // 自动声明队列、交换机和绑定
		}),
	)
	if err != nil {
		panic(err)
	}
	defer client.Close()
	
	// 使用已声明的队列和交换机...
}

中间件使用

func main() {
	client, err := mq.New(
		mq.WithAddress("amqp://guest:guest@localhost:5672/"),
		mq.WithConsumerMiddlewares(
			// 日志中间件
			mq.LoggingMiddleware(),
			// 重试中间件
			mq.RetryMiddleware(3, 1*time.Second),
			// 自定义中间件
			func(next mq.HandlerFunc) mq.HandlerFunc {
				return func(ctx context.Context, msg interface{}) error {
					start := time.Now()
					err := next(ctx, msg)
					fmt.Printf("处理耗时: %v\n", time.Since(start))
					return err
				}
			},
		),
	)
	if err != nil {
		panic(err)
	}
	
	// 创建消费者...
}

错误处理与监控

func main() {
	// 创建带监控的客户端
	client, err := mq.New(
		mq.WithAddress("amqp://guest:guest@localhost:5672/"),
		mq.WithMetrics(&mq.MetricsConfig{
			Namespace: "myapp",
			Subsystem: "rabbitmq",
		}),
	)
	if err != nil {
		panic(err)
	}
	
	// 错误处理示例
	consumer := client.NewConsumer(&mq.ConsumerConfig{
		Exchange: "order_exchange",
		Queue:    "order_queue",
		ErrorHandler: func(ctx context.Context, err error, msg interface{}) {
			// 自定义错误处理逻辑
			fmt.Printf("处理消息失败: %v, 消息: %+v\n", err, msg)
			// 可以选择重试、记录或发送到死信队列
		},
	})
	
	// ...
}

最佳实践

  1. 连接管理:始终使用 defer client.Close() 确保连接关闭
  2. 消息序列化:默认使用 JSON,可通过 mq.WithSerializer() 配置其他序列化方式
  3. 错误处理:为消费者配置适当的错误处理逻辑
  4. 资源声明:在生产环境预先声明所有队列和交换机
  5. 监控:集成监控中间件跟踪消息处理情况

go-mq 通过声明式配置简化了 RabbitMQ 的使用,同时保持了足够的灵活性来处理各种消息模式。更多高级用法和配置选项可以参考项目的官方文档。

回到顶部