golang AMQP交换机和队列轻量级封装插件库rabbus的使用
golang AMQP交换机和队列轻量级封装插件库rabbus的使用
Rabbus 🚌 ✨
Rabbus是一个轻量级的AMQP交换机和队列封装库,主要特性包括:
- 基于amqp库的轻量级封装
- 内存重试机制,支持指数退避发送消息
- 使用circuit breaker保护生产者调用
- 自动重连RabbitMQ代理
- Go channel API
安装
go get -u github.com/rafaeljesus/rabbus
使用
Rabbus提供了发送和监听RabbitMQ消息的接口。
发送消息示例
import (
"context"
"time"
"github.com/rafaeljesus/rabbus"
)
func main() {
timeout := time.After(time.Second * 3)
cbStateChangeFunc := func(name, from, to string) {
// 状态变更时的处理逻辑
}
r, err := rabbus.New(
rabbusDsn, // RabbitMQ连接DSN
rabbus.Durable(true), // 持久化
rabbus.Attempts(5), // 重试次数
rabbus.Sleep(time.Second*2), // 重试间隔
rabbus.Threshold(3), // 熔断阈值
rabbus.OnStateChange(cbStateChangeFunc), // 状态变更回调
)
if err != nil {
// 处理错误
}
defer func(r Rabbus) {
if err := r.Close(); err != nil {
// 处理错误
}
}(r)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go r.Run(ctx)
msg := rabbus.Message{
Exchange: "test_ex", // 交换机名称
Kind: "topic", // 交换机类型
Key: "test_key", // 路由键
Payload: []byte(`foo`), // 消息内容
}
r.EmitAsync() <- msg // 异步发送消息
for {
select {
case <-r.EmitOk():
// 消息发送成功
case <-r.EmitErr():
// 消息发送失败
case <-timeout:
// 处理超时
}
}
}
监听消息示例
import (
"context"
"encoding/json"
"time"
"github.com/rafaeljesus/rabbus"
)
func main() {
timeout := time.After(time.Second * 3)
cbStateChangeFunc := func(name, from, to string) {
// 状态变更时的处理逻辑
}
r, err := rabbus.New(
rabbusDsn,
rabbus.Durable(true),
rabbus.Attempts(5),
rabbus.Sleep(time.Second*2),
rabbus.Threshold(3),
rabbus.OnStateChange(cbStateChangeFunc),
)
if err != nil {
// 处理错误
}
defer func(r Rabbus) {
if err := r.Close(); err != nil {
// 处理错误
}
}(r)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go r.Run(ctx)
// 监听消息配置
messages, err := r.Listen(rabbus.ListenConfig{
Exchange: "events_ex", // 交换机名称
Kind: "topic", // 交换机类型
Key: "events_key", // 路由键
Queue: "events_q", // 队列名称
DeclareArgs: rabbus.NewDeclareArgs().WithMessageTTL(15 * time.Minute).With("foo", "bar"), // 声明参数
BindArgs: rabbus.NewBindArgs().With("baz", "qux"), // 绑定参数
})
if err != nil {
// 处理添加监听器时的错误
}
defer close(messages)
// 处理接收到的消息
go func(messages chan ConsumerMessage) {
for m := range messages {
m.Ack(false) // 确认消息
}
}(messages)
}
贡献指南
- Fork项目
- 创建特性分支 (
git checkout -b my-new-feature
) - 提交更改 (
git commit -am 'Add some feature'
) - 推送到分支 (
git push origin my-new-feature
) - 创建Pull Request
项目状态
[构建状态] [Go报告卡] [Go文档]
更多关于golang AMQP交换机和队列轻量级封装插件库rabbus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang AMQP交换机和队列轻量级封装插件库rabbus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
RabbitBus - Golang AMQP 轻量级封装库使用指南
RabbitBus 是一个轻量级的 Golang AMQP (RabbitMQ) 封装库,提供了简单易用的 API 来处理消息队列的发布和消费。下面我将详细介绍其使用方法。
安装
go get github.com/rafaeljesus/rabbus
基本使用
1. 初始化连接
package main
import (
"log"
"time"
"github.com/rafaeljesus/rabbus"
)
func main() {
rb, err := rabbus.New(
"amqp://guest:guest@localhost:5672/",
rabbus.Durable(true),
rabbus.Attempts(5),
rabbus.Sleep(time.Second*2),
rabbus.Timeout(time.Second*10),
)
if err != nil {
log.Fatalf("Failed to init rabbus: %v", err)
}
defer rb.Close()
// 其他操作...
}
2. 声明交换机和队列
// 声明交换机
err := rb.DeclareExchange(rabbus.ExchangeConfig{
Name: "test_exchange",
Kind: "direct",
Durable: true,
AutoDelete: false,
})
if err != nil {
log.Fatalf("Failed to declare exchange: %v", err)
}
// 声明队列
err = rb.DeclareQueue(rabbus.QueueConfig{
Name: "test_queue",
Durable: true,
AutoDelete: false,
})
if err != nil {
log.Fatalf("Failed to declare queue: %v", err)
}
// 绑定队列到交换机
err = rb.BindQueue(rabbus.BindConfig{
Exchange: "test_exchange",
Queue: "test_queue",
RoutingKey: "test_routing_key",
})
if err != nil {
log.Fatalf("Failed to bind queue: %v", err)
}
3. 发布消息
// 发布消息
msg := rabbus.Message{
Exchange: "test_exchange",
Key: "test_routing_key",
Payload: []byte("Hello RabbitMQ"),
DeliveryMode: rabbus.Persistent,
}
select {
case rb.Emit() <- msg:
log.Println("Message published")
case <-time.After(time.Second):
log.Println("Publish timeout")
}
4. 消费消息
// 消费消息
consumer, err := rb.Consumer("test_queue")
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
go func() {
for msg := range consumer {
log.Printf("Received message: %s", string(msg.Body))
msg.Ack(false) // 确认消息
}
}()
// 保持程序运行
select {}
高级特性
1. 重试机制
RabbitBus 内置了重试机制,可以在连接失败时自动重试:
rb, err := rabbus.New(
"amqp://guest:guest@localhost:5672/",
rabbus.Attempts(3), // 最大重试次数
rabbus.Sleep(time.Second*5), // 重试间隔
)
2. 消息确认模式
// 自动确认模式
consumer, err := rb.Consumer("test_queue", rabbus.AutoAck(true))
// 手动确认模式
consumer, err := rb.Consumer("test_queue")
for msg := range consumer {
// 处理消息
if err := process(msg); err != nil {
msg.Nack(false, true) // 拒绝并重新入队
} else {
msg.Ack(false) // 确认处理完成
}
}
3. 消息优先级
msg := rabbus.Message{
Exchange: "test_exchange",
Key: "test_routing_key",
Payload: []byte("High priority message"),
Priority: 5, // 优先级 0-9
}
4. 死信队列配置
// 声明死信队列
err := rb.DeclareQueue(rabbus.QueueConfig{
Name: "dlx_queue",
Durable: true,
Arguments: rabbus.QueueArguments{
"x-message-ttl": 60000, // 60秒TTL
"x-dead-letter-exchange": "dlx_exchange",
},
})
完整示例
package main
import (
"log"
"time"
"github.com/rafaeljesus/rabbus"
)
func main() {
// 初始化连接
rb, err := rabbus.New(
"amqp://guest:guest@localhost:5672/",
rabbus.Durable(true),
rabbus.Attempts(5),
rabbus.Sleep(time.Second*2),
)
if err != nil {
log.Fatal(err)
}
defer rb.Close()
// 声明交换机
if err := rb.DeclareExchange(rabbus.ExchangeConfig{
Name: "test_ex",
Kind: "direct",
Durable: true,
}); err != nil {
log.Fatal(err)
}
// 声明队列
if err := rb.DeclareQueue(rabbus.QueueConfig{
Name: "test_q",
Durable: true,
}); err != nil {
log.Fatal(err)
}
// 绑定队列
if err := rb.BindQueue(rabbus.BindConfig{
Exchange: "test_ex",
Queue: "test_q",
RoutingKey: "test_key",
}); err != nil {
log.Fatal(err)
}
// 发布消息
go func() {
for i := 0; i < 10; i++ {
msg := rabbus.Message{
Exchange: "test_ex",
Key: "test_key",
Payload: []byte("message " + string(i)),
}
select {
case rb.Emit() <- msg:
log.Printf("Published message %d", i)
case <-time.After(time.Second):
log.Println("Publish timeout")
}
time.Sleep(time.Second)
}
}()
// 消费消息
consumer, err := rb.Consumer("test_q")
if err != nil {
log.Fatal(err)
}
for msg := range consumer {
log.Printf("Received: %s", string(msg.Body))
msg.Ack(false)
}
}
总结
RabbitBus 提供了以下主要优势:
- 简洁的 API 设计
- 自动重连机制
- 灵活的配置选项
- 支持多种消息模式
- 轻量级无额外依赖
对于需要简单可靠地集成 RabbitMQ 的 Golang 应用来说,RabbitBus 是一个不错的选择。