golang声明式配置的RabbitMQ客户端插件库go-mq的使用
golang声明式配置的RabbitMQ客户端插件库go-mq的使用
关于
go-mq是一个Golang库,提供了以声明式方式封装RabbitMQ(AMQP)实体创建和配置的能力,如队列、交换器、生产者和消费者,只需单个配置文件即可完成配置。
交换器、队列和生产者将在后台自动初始化。
go-mq支持同步和异步两种生产者模式。
go-mq具有在连接关闭或网络错误时自动重连的功能,可以通过reconnect_delay
选项配置每次重连尝试之间的延迟。
最低Go版本要求
1.16
安装
go get -u github.com/cheshir/go-mq/v2
配置
可以通过mq.Config结构体直接配置,或从配置文件填充配置。
支持的配置标签:
- yaml
- json
- mapstructure
可用配置选项示例:
dsn: "amqp://login:password@host:port/virtual_host" # 集群连接可使用逗号分隔的列表
reconnect_delay: 5s # 连接尝试之间的间隔
test_mode: false # 切换到使用模拟代理,默认为false
exchanges:
- name: "exchange_name"
type: "direct"
options:
auto_delete: false
durable: false
internal: false
no_wait: false
queues:
- name: "queue_name"
exchange: "exchange_name"
routing_key: "route"
binding_options:
no_wait: false
options:
auto_delete: false
durable: false
exclusive: false
no_wait: false
producers:
- name: "producer_name"
buffer_size: 10 # 在发布大量消息时可以缓冲的消息数量
exchange: "exchange_name"
routing_key: "route"
sync: false # 指定生产者工作在同步还是异步模式
options:
content_type: "application/json"
delivery_mode: 2 # 1-非持久化,2-持久化
consumers:
- name: "consumer_name"
queue: "queue_name"
workers: 1 # 工作线程数,默认为1
prefetch_count: 0 # 每个工作线程预取消息数
prefetch_size: 0 # 每个工作线程预取消息大小
options:
no_ack: false
no_local: false
no_wait: false
exclusive: false
完整示例
基本使用示例
package main
import (
"log"
"github.com/cheshir/go-mq"
)
func main() {
// 配置RabbitMQ连接和实体
config := mq.Config{
DSN: "amqp://guest:guest@localhost:5672/",
Exchanges: []mq.ExchangeConfig{
{
Name: "test_exchange",
Type: "direct",
},
},
Queues: []mq.QueueConfig{
{
Name: "test_queue",
Exchange: "test_exchange",
RoutingKey: "test_route",
},
},
Producers: []mq.ProducerConfig{
{
Name: "test_producer",
Exchange: "test_exchange",
RoutingKey: "test_route",
},
},
Consumers: []mq.ConsumerConfig{
{
Name: "test_consumer",
Queue: "test_queue",
Workers: 1,
},
},
}
// 创建MQ实例
queue, err := mq.New(config)
if err != nil {
log.Fatal(err)
}
// 处理错误
go handleErrors(queue.Error())
// 获取生产者
producer := queue.AsyncProducer("test_producer")
// 发布消息
err = producer.Produce([]byte("test message"))
if err != nil {
log.Println("Failed to publish message:", err)
}
// 获取消费者
consumer := queue.Consumer("test_consumer")
// 启动消费者
consumer.Start(func(message mq.Message) {
log.Printf("Received message: %s", message.Body)
message.Ack(false) // 确认消息
})
// 其他业务逻辑...
}
func handleErrors(errors <-chan error) {
for err := range errors {
log.Println("MQ error:", err)
}
}
同步生产者示例
// 配置同步生产者
config := mq.Config{
// ...其他配置
Producers: []mq.ProducerConfig{
{
Name: "sync_producer",
Exchange: "test_exchange",
RoutingKey: "test_route",
Sync: true, // 设置为同步模式
},
},
}
// 获取同步生产者
syncProducer := queue.SyncProducer("sync_producer")
// 发布消息并立即获取结果
err := syncProducer.Produce([]byte("sync message"))
if err != nil {
log.Println("Failed to publish sync message:", err)
}
错误处理
所有错误都可通过导出的通道访问:
func handleMQErrors(errors <-chan error) {
for err := range errors {
log.Println(err)
}
}
// 在主函数中
go handleMQErrors(queue.Error())
如果通道已满,新的错误将被丢弃。
同步生产者的错误不会通过错误通道访问,因为它们会直接返回。
测试
有些情况只能使用真实代理测试,有些则只能使用模拟代理测试。
如果可以使用真实代理运行测试:
go test -mock-broker=0
否则将使用模拟代理。
从旧版本升级
从v1升级到v2
New()
返回*MessageQueue
而不是接口- 最低Go版本更新为1.16
从0.x升级到1.x
GetConsumer()
方法重命名为Consumer()
GetProducer()
方法已移除,改用AsyncProducer()
或SyncProducer()
更多关于golang声明式配置的RabbitMQ客户端插件库go-mq的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang声明式配置的RabbitMQ客户端插件库go-mq的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
go-mq: 声明式配置的RabbitMQ客户端库
go-mq 是一个基于声明式配置的 RabbitMQ 客户端库,它简化了在 Golang 中使用 RabbitMQ 的复杂性,让开发者可以通过配置而非代码来定义队列、交换机和绑定关系。
核心特性
- 声明式配置:通过结构体标签定义队列、交换机和绑定
- 自动连接管理:自动处理连接断开和重连
- 多种消费模式:支持简单消费者、工作队列、发布/订阅等模式
- 中间件支持:可插入日志、重试、监控等中间件
安装
go get github.com/wx-up/go-mq
基本使用示例
1. 生产者示例
package main
import (
"context"
"fmt"
"time"
"github.com/wx-up/go-mq"
)
// 定义消息结构体
type OrderMessage struct {
OrderID string `json:"order_id"`
ProductID int `json:"product_id"`
Quantity int `json:"quantity"`
}
func main() {
// 创建RabbitMQ客户端
client, err := mq.New(
mq.WithAddress("amqp://guest:guest@localhost:5672/"),
mq.WithLogger(mq.DefaultLogger),
)
if err != nil {
panic(err)
}
defer client.Close()
// 创建生产者
producer := client.NewProducer("order_exchange")
// 发送消息
msg := OrderMessage{
OrderID: "12345",
ProductID: 1001,
Quantity: 2,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = producer.Publish(ctx, msg,
mq.WithRoutingKey("order.created"),
mq.WithHeader("retry", "3"),
)
if err != nil {
fmt.Printf("发送消息失败: %v\n", err)
return
}
fmt.Println("消息发送成功")
}
2. 消费者示例
package main
import (
"context"
"fmt"
"time"
"github.com/wx-up/go-mq"
)
// 定义消息结构体
type OrderMessage struct {
OrderID string `json:"order_id"`
ProductID int `json:"product_id"`
Quantity int `json:"quantity"`
}
func main() {
// 创建RabbitMQ客户端
client, err := mq.New(
mq.WithAddress("amqp://guest:guest@localhost:5672/"),
mq.WithLogger(mq.DefaultLogger),
)
if err != nil {
panic(err)
}
defer client.Close()
// 定义消费者配置
consumerCfg := &mq.ConsumerConfig{
Exchange: "order_exchange",
Queue: "order_queue",
Routing: []string{"order.created"},
Concurrency: 5, // 并发消费者数量
}
// 创建消费者
consumer := client.NewConsumer(consumerCfg)
// 定义消息处理函数
handler := func(ctx context.Context, msg *OrderMessage) error {
fmt.Printf("收到订单消息: %+v\n", msg)
// 处理订单逻辑...
return nil
}
// 启动消费者
ctx := context.Background()
err = consumer.Start(ctx, handler)
if err != nil {
fmt.Printf("消费者启动失败: %v\n", err)
return
}
// 保持运行
select {}
}
高级配置
声明式队列定义
type OrderEvent struct {
EventType string `json:"event_type"`
OrderID string `json:"order_id"`
// 通过结构体标签声明队列属性
Queue string `mq:"queue=order_events;durable=true;auto_delete=false"`
Exchange string `mq:"exchange=order_events;type=topic;durable=true"`
Routing string `mq:"routing=order.*"`
}
func main() {
client, err := mq.New(
mq.WithAddress("amqp://guest:guest@localhost:5672/"),
mq.WithDeclare([]mq.Declarer{
&OrderEvent{}, // 自动声明队列、交换机和绑定
}),
)
if err != nil {
panic(err)
}
defer client.Close()
// 使用已声明的队列和交换机...
}
中间件使用
func main() {
client, err := mq.New(
mq.WithAddress("amqp://guest:guest@localhost:5672/"),
mq.WithConsumerMiddlewares(
// 日志中间件
mq.LoggingMiddleware(),
// 重试中间件
mq.RetryMiddleware(3, 1*time.Second),
// 自定义中间件
func(next mq.HandlerFunc) mq.HandlerFunc {
return func(ctx context.Context, msg interface{}) error {
start := time.Now()
err := next(ctx, msg)
fmt.Printf("处理耗时: %v\n", time.Since(start))
return err
}
},
),
)
if err != nil {
panic(err)
}
// 创建消费者...
}
错误处理与监控
func main() {
// 创建带监控的客户端
client, err := mq.New(
mq.WithAddress("amqp://guest:guest@localhost:5672/"),
mq.WithMetrics(&mq.MetricsConfig{
Namespace: "myapp",
Subsystem: "rabbitmq",
}),
)
if err != nil {
panic(err)
}
// 错误处理示例
consumer := client.NewConsumer(&mq.ConsumerConfig{
Exchange: "order_exchange",
Queue: "order_queue",
ErrorHandler: func(ctx context.Context, err error, msg interface{}) {
// 自定义错误处理逻辑
fmt.Printf("处理消息失败: %v, 消息: %+v\n", err, msg)
// 可以选择重试、记录或发送到死信队列
},
})
// ...
}
最佳实践
- 连接管理:始终使用
defer client.Close()
确保连接关闭 - 消息序列化:默认使用 JSON,可通过
mq.WithSerializer()
配置其他序列化方式 - 错误处理:为消费者配置适当的错误处理逻辑
- 资源声明:在生产环境预先声明所有队列和交换机
- 监控:集成监控中间件跟踪消息处理情况
go-mq 通过声明式配置简化了 RabbitMQ 的使用,同时保持了足够的灵活性来处理各种消息模式。更多高级用法和配置选项可以参考项目的官方文档。