golang RabbitMQ消息队列客户端插件库amqp的使用
Golang RabbitMQ消息队列客户端插件库amqp的使用
介绍
这是一个由RabbitMQ核心团队维护的Go AMQP 0.9.1客户端库,最初由Sean Treadway开发。
与streadway/amqp的区别
包名
这个库使用了不同的包名。如果从streadway/amqp
迁移,可以使用别名来减少需要的更改:
amqp "github.com/rabbitmq/amqp091-go"
许可证
这个客户端使用与原始项目相同的2条款BSD许可证。
公共API演进
这个客户端尽可能保留了关键的API元素,但对社区建议的合理的破坏性公共API更改持开放态度。
项目成熟度
这个项目基于一个已经存在十多年的成熟Go客户端。
支持的Go版本
这个客户端支持最近的两个Go发布系列。
支持的RabbitMQ版本
这个项目支持从2.0
开始的RabbitMQ版本,但主要针对当前支持的RabbitMQ发布系列进行测试。
使用示例
生产者示例
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
// 连接到RabbitMQ服务器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"hello", // 队列名
false, // 持久化
false, // 不使用时删除
false, // 排他队列
false, // 不等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 发布消息
body := "Hello World!"
err = ch.Publish(
"", // 交换机
q.Name, // 路由键
false, // 强制
false, // 立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s\n", body)
}
消费者示例
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
// 连接到RabbitMQ服务器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"hello", // 队列名
false, // 持久化
false, // 不使用时删除
false, // 排他队列
false, // 不等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 注册消费者
msgs, err := ch.Consume(
q.Name, // 队列
"", // 消费者标签
true, // 自动确认
false, // 排他
false, // 不等待
false, // 不等待
nil, // 参数
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
注意事项
- 这个客户端不支持自动重连和客户端与服务器拓扑的重新同步
- 不支持AMQP协议协商以实现向前或向后兼容
- 仅支持PLAIN和EXTERNAL认证机制
- 不支持
basic.return
和basic.ack
帧排序保证
贡献
欢迎提交Pull Request。请在非主分支上创建您的Pull Request,确保包含覆盖您更改的测试或示例,并且您的提交代表连贯的更改,包括更改的原因。
更多关于golang RabbitMQ消息队列客户端插件库amqp的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang RabbitMQ消息队列客户端插件库amqp的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang RabbitMQ客户端库amqp使用指南
RabbitMQ是一个流行的开源消息代理软件,而Go语言中的streadway/amqp
库是与RabbitMQ交互的标准方式。下面我将详细介绍如何使用这个库。
1. 安装amqp库
首先需要安装amqp库:
go get github.com/streadway/amqp
2. 基本连接和通道创建
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接到RabbitMQ服务器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
}
3. 发送消息
// 声明队列
q, err := ch.QueueDeclare(
"hello", // 队列名称
false, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")
// 发布消息
body := "Hello World!"
err = ch.Publish(
"", // 交换机
q.Name, // 路由键
false, // 强制
false, // 立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
4. 接收消息
// 声明队列(与发送方相同)
q, err := ch.QueueDeclare(
"hello", // 队列名称
false, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")
// 注册消费者
msgs, err := ch.Consume(
q.Name, // 队列
"", // 消费者标签
true, // 自动应答
false, // 排他
false, // 不等待
false, // 无额外参数
nil,
)
failOnError(err, "Failed to register a consumer")
// 处理消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
5. 高级特性
5.1 消息确认
// 消费者端关闭自动应答
msgs, err := ch.Consume(
q.Name, // 队列
"", // 消费者标签
false, // 关闭自动应答
false, // 排他
false, // 不等待
false, // 无额外参数
nil,
)
// 处理消息时手动确认
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 模拟处理
time.Sleep(1 * time.Second)
d.Ack(false) // 手动确认消息
}
5.2 交换机使用
// 声明交换机
err = ch.ExchangeDeclare(
"logs", // 名称
"fanout", // 类型
true, // 持久化
false, // 自动删除
false, // 内部
false, // 不等待
nil, // 参数
)
failOnError(err, "Failed to declare an exchange")
// 发布到交换机
err = ch.Publish(
"logs", // 交换机名称
"", // 路由键
false, // 强制
false, // 立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
5.3 队列绑定
// 消费者绑定队列到交换机
err = ch.QueueBind(
q.Name, // 队列名称
"", // 路由键
"logs", // 交换机名称
false,
nil,
)
failOnError(err, "Failed to bind a queue")
6. 错误处理和重连
RabbitMQ连接可能会中断,需要实现重连机制:
func connect() (*amqp.Connection, error) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
return nil, err
}
// 监听连接关闭事件
go func() {
<-conn.NotifyClose(make(chan *amqp.Error))
log.Println("Connection closed, reconnecting...")
time.Sleep(5 * time.Second)
conn = connect()
}()
return conn, nil
}
7. 最佳实践
- 连接复用:一个应用通常只需要一个连接,多个通道
- 通道复用:尽量复用通道而非频繁创建销毁
- 错误处理:妥善处理连接中断和通道错误
- 资源清理:确保关闭连接和通道
- QoS设置:合理设置预取计数
// 设置QoS
err = ch.Qos(
1, // 预取计数
0, // 预取大小
false, // 全局设置
)
failOnError(err, "Failed to set QoS")
以上是streadway/amqp
库的基本使用方法。根据实际需求,你可以进一步探索RabbitMQ的高级特性如死信队列、优先级队列等。