golang AMQP交换机和队列轻量级封装插件库rabbus的使用

golang AMQP交换机和队列轻量级封装插件库rabbus的使用

Rabbus 🚌 ✨

Rabbus是一个轻量级的AMQP交换机和队列封装库,主要特性包括:

  • 基于amqp库的轻量级封装
  • 内存重试机制,支持指数退避发送消息
  • 使用circuit breaker保护生产者调用
  • 自动重连RabbitMQ代理
  • Go channel API

安装

go get -u github.com/rafaeljesus/rabbus

使用

Rabbus提供了发送和监听RabbitMQ消息的接口。

发送消息示例

import (
	"context"
	"time"

	"github.com/rafaeljesus/rabbus"
)

func main() {
	timeout := time.After(time.Second * 3)
	cbStateChangeFunc := func(name, from, to string) {
		// 状态变更时的处理逻辑
	}
	r, err := rabbus.New(
		rabbusDsn, // RabbitMQ连接DSN
		rabbus.Durable(true), // 持久化
		rabbus.Attempts(5),   // 重试次数
		rabbus.Sleep(time.Second*2), // 重试间隔
		rabbus.Threshold(3),  // 熔断阈值
		rabbus.OnStateChange(cbStateChangeFunc), // 状态变更回调
	)
	if err != nil {
		// 处理错误
	}

	defer func(r Rabbus) {
		if err := r.Close(); err != nil {
			// 处理错误
		}
	}(r)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go r.Run(ctx)

	msg := rabbus.Message{
		Exchange: "test_ex", // 交换机名称
		Kind:     "topic",   // 交换机类型
		Key:      "test_key", // 路由键
		Payload:  []byte(`foo`), // 消息内容
	}

	r.EmitAsync() <- msg // 异步发送消息

	for {
		select {
		case <-r.EmitOk():
			// 消息发送成功
		case <-r.EmitErr():
			// 消息发送失败
		case <-timeout:
			// 处理超时
		}
	}
}

监听消息示例

import (
	"context"
	"encoding/json"
	"time"

	"github.com/rafaeljesus/rabbus"
)

func main() {
	timeout := time.After(time.Second * 3)
	cbStateChangeFunc := func(name, from, to string) {
		// 状态变更时的处理逻辑
	}
	r, err := rabbus.New(
		rabbusDsn,
		rabbus.Durable(true),
		rabbus.Attempts(5),
		rabbus.Sleep(time.Second*2),
		rabbus.Threshold(3),
		rabbus.OnStateChange(cbStateChangeFunc),
	)
	if err != nil {
		// 处理错误
	}

	defer func(r Rabbus) {
		if err := r.Close(); err != nil {
			// 处理错误
		}
	}(r)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go r.Run(ctx)

	// 监听消息配置
	messages, err := r.Listen(rabbus.ListenConfig{
		Exchange:    "events_ex",   // 交换机名称
		Kind:        "topic",       // 交换机类型
		Key:         "events_key",  // 路由键
		Queue:       "events_q",    // 队列名称
		DeclareArgs: rabbus.NewDeclareArgs().WithMessageTTL(15 * time.Minute).With("foo", "bar"), // 声明参数
		BindArgs:    rabbus.NewBindArgs().With("baz", "qux"), // 绑定参数
	})
	if err != nil {
		// 处理添加监听器时的错误
	}
	defer close(messages)

	// 处理接收到的消息
	go func(messages chan ConsumerMessage) {
		for m := range messages {
			m.Ack(false) // 确认消息
		}
	}(messages)
}

贡献指南

  1. Fork项目
  2. 创建特性分支 (git checkout -b my-new-feature)
  3. 提交更改 (git commit -am 'Add some feature')
  4. 推送到分支 (git push origin my-new-feature)
  5. 创建Pull Request

项目状态

[构建状态] [Go报告卡] [Go文档]


更多关于golang AMQP交换机和队列轻量级封装插件库rabbus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang AMQP交换机和队列轻量级封装插件库rabbus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


RabbitBus - Golang AMQP 轻量级封装库使用指南

RabbitBus 是一个轻量级的 Golang AMQP (RabbitMQ) 封装库,提供了简单易用的 API 来处理消息队列的发布和消费。下面我将详细介绍其使用方法。

安装

go get github.com/rafaeljesus/rabbus

基本使用

1. 初始化连接

package main

import (
	"log"
	"time"

	"github.com/rafaeljesus/rabbus"
)

func main() {
	rb, err := rabbus.New(
		"amqp://guest:guest@localhost:5672/",
		rabbus.Durable(true),
		rabbus.Attempts(5),
		rabbus.Sleep(time.Second*2),
		rabbus.Timeout(time.Second*10),
	)
	if err != nil {
		log.Fatalf("Failed to init rabbus: %v", err)
	}
	defer rb.Close()
	
	// 其他操作...
}

2. 声明交换机和队列

// 声明交换机
err := rb.DeclareExchange(rabbus.ExchangeConfig{
    Name:       "test_exchange",
    Kind:       "direct",
    Durable:    true,
    AutoDelete: false,
})
if err != nil {
    log.Fatalf("Failed to declare exchange: %v", err)
}

// 声明队列
err = rb.DeclareQueue(rabbus.QueueConfig{
    Name:       "test_queue",
    Durable:    true,
    AutoDelete: false,
})
if err != nil {
    log.Fatalf("Failed to declare queue: %v", err)
}

// 绑定队列到交换机
err = rb.BindQueue(rabbus.BindConfig{
    Exchange:   "test_exchange",
    Queue:      "test_queue",
    RoutingKey: "test_routing_key",
})
if err != nil {
    log.Fatalf("Failed to bind queue: %v", err)
}

3. 发布消息

// 发布消息
msg := rabbus.Message{
    Exchange:     "test_exchange",
    Key:          "test_routing_key",
    Payload:      []byte("Hello RabbitMQ"),
    DeliveryMode: rabbus.Persistent,
}

select {
case rb.Emit() <- msg:
    log.Println("Message published")
case <-time.After(time.Second):
    log.Println("Publish timeout")
}

4. 消费消息

// 消费消息
consumer, err := rb.Consumer("test_queue")
if err != nil {
    log.Fatalf("Failed to create consumer: %v", err)
}

go func() {
    for msg := range consumer {
        log.Printf("Received message: %s", string(msg.Body))
        msg.Ack(false) // 确认消息
    }
}()

// 保持程序运行
select {}

高级特性

1. 重试机制

RabbitBus 内置了重试机制,可以在连接失败时自动重试:

rb, err := rabbus.New(
    "amqp://guest:guest@localhost:5672/",
    rabbus.Attempts(3),          // 最大重试次数
    rabbus.Sleep(time.Second*5), // 重试间隔
)

2. 消息确认模式

// 自动确认模式
consumer, err := rb.Consumer("test_queue", rabbus.AutoAck(true))

// 手动确认模式
consumer, err := rb.Consumer("test_queue")
for msg := range consumer {
    // 处理消息
    if err := process(msg); err != nil {
        msg.Nack(false, true) // 拒绝并重新入队
    } else {
        msg.Ack(false)       // 确认处理完成
    }
}

3. 消息优先级

msg := rabbus.Message{
    Exchange:     "test_exchange",
    Key:          "test_routing_key",
    Payload:      []byte("High priority message"),
    Priority:     5, // 优先级 0-9
}

4. 死信队列配置

// 声明死信队列
err := rb.DeclareQueue(rabbus.QueueConfig{
    Name:       "dlx_queue",
    Durable:    true,
    Arguments: rabbus.QueueArguments{
        "x-message-ttl":          60000, // 60秒TTL
        "x-dead-letter-exchange": "dlx_exchange",
    },
})

完整示例

package main

import (
	"log"
	"time"

	"github.com/rafaeljesus/rabbus"
)

func main() {
	// 初始化连接
	rb, err := rabbus.New(
		"amqp://guest:guest@localhost:5672/",
		rabbus.Durable(true),
		rabbus.Attempts(5),
		rabbus.Sleep(time.Second*2),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer rb.Close()

	// 声明交换机
	if err := rb.DeclareExchange(rabbus.ExchangeConfig{
		Name:    "test_ex",
		Kind:    "direct",
		Durable: true,
	}); err != nil {
		log.Fatal(err)
	}

	// 声明队列
	if err := rb.DeclareQueue(rabbus.QueueConfig{
		Name:    "test_q",
		Durable: true,
	}); err != nil {
		log.Fatal(err)
	}

	// 绑定队列
	if err := rb.BindQueue(rabbus.BindConfig{
		Exchange:   "test_ex",
		Queue:      "test_q",
		RoutingKey: "test_key",
	}); err != nil {
		log.Fatal(err)
	}

	// 发布消息
	go func() {
		for i := 0; i < 10; i++ {
			msg := rabbus.Message{
				Exchange: "test_ex",
				Key:      "test_key",
				Payload:  []byte("message " + string(i)),
			}

			select {
			case rb.Emit() <- msg:
				log.Printf("Published message %d", i)
			case <-time.After(time.Second):
				log.Println("Publish timeout")
			}

			time.Sleep(time.Second)
		}
	}()

	// 消费消息
	consumer, err := rb.Consumer("test_q")
	if err != nil {
		log.Fatal(err)
	}

	for msg := range consumer {
		log.Printf("Received: %s", string(msg.Body))
		msg.Ack(false)
	}
}

总结

RabbitBus 提供了以下主要优势:

  1. 简洁的 API 设计
  2. 自动重连机制
  3. 灵活的配置选项
  4. 支持多种消息模式
  5. 轻量级无额外依赖

对于需要简单可靠地集成 RabbitMQ 的 Golang 应用来说,RabbitBus 是一个不错的选择。

回到顶部