golang RabbitMQ消息队列客户端插件库amqp的使用

Golang RabbitMQ消息队列客户端插件库amqp的使用

介绍

这是一个由RabbitMQ核心团队维护的Go AMQP 0.9.1客户端库,最初由Sean Treadway开发。

与streadway/amqp的区别

包名

这个库使用了不同的包名。如果从streadway/amqp迁移,可以使用别名来减少需要的更改:

amqp "github.com/rabbitmq/amqp091-go"

许可证

这个客户端使用与原始项目相同的2条款BSD许可证。

公共API演进

这个客户端尽可能保留了关键的API元素,但对社区建议的合理的破坏性公共API更改持开放态度。

项目成熟度

这个项目基于一个已经存在十多年的成熟Go客户端。

支持的Go版本

这个客户端支持最近的两个Go发布系列。

支持的RabbitMQ版本

这个项目支持从2.0开始的RabbitMQ版本,但主要针对当前支持的RabbitMQ发布系列进行测试。

使用示例

生产者示例

package main

import (
	"log"
	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到RabbitMQ服务器
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明队列
	q, err := ch.QueueDeclare(
		"hello", // 队列名
		false,   // 持久化
		false,   // 不使用时删除
		false,   // 排他队列
		false,   // 不等待
		nil,     // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 发布消息
	body := "Hello World!"
	err = ch.Publish(
		"",     // 交换机
		q.Name, // 路由键
		false,  // 强制
		false,  // 立即
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s\n", body)
}

消费者示例

package main

import (
	"log"
	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到RabbitMQ服务器
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明队列
	q, err := ch.QueueDeclare(
		"hello", // 队列名
		false,   // 持久化
		false,   // 不使用时删除
		false,   // 排他队列
		false,   // 不等待
		nil,     // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 注册消费者
	msgs, err := ch.Consume(
		q.Name, // 队列
		"",     // 消费者标签
		true,   // 自动确认
		false,  // 排他
		false,  // 不等待
		false,  // 不等待
		nil,    // 参数
	)
	failOnError(err, "Failed to register a consumer")

	var forever chan struct{}

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

注意事项

  1. 这个客户端不支持自动重连和客户端与服务器拓扑的重新同步
  2. 不支持AMQP协议协商以实现向前或向后兼容
  3. 仅支持PLAIN和EXTERNAL认证机制
  4. 不支持basic.returnbasic.ack帧排序保证

贡献

欢迎提交Pull Request。请在非主分支上创建您的Pull Request,确保包含覆盖您更改的测试或示例,并且您的提交代表连贯的更改,包括更改的原因。


更多关于golang RabbitMQ消息队列客户端插件库amqp的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang RabbitMQ消息队列客户端插件库amqp的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang RabbitMQ客户端库amqp使用指南

RabbitMQ是一个流行的开源消息代理软件,而Go语言中的streadway/amqp库是与RabbitMQ交互的标准方式。下面我将详细介绍如何使用这个库。

1. 安装amqp库

首先需要安装amqp库:

go get github.com/streadway/amqp

2. 基本连接和通道创建

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到RabbitMQ服务器
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
}

3. 发送消息

// 声明队列
q, err := ch.QueueDeclare(
	"hello", // 队列名称
	false,   // 是否持久化
	false,   // 是否自动删除
	false,   // 是否排他
	false,   // 是否等待
	nil,     // 额外参数
)
failOnError(err, "Failed to declare a queue")

// 发布消息
body := "Hello World!"
err = ch.Publish(
	"",     // 交换机
	q.Name, // 路由键
	false,  // 强制
	false,  // 立即
	amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

4. 接收消息

// 声明队列(与发送方相同)
q, err := ch.QueueDeclare(
	"hello", // 队列名称
	false,   // 是否持久化
	false,   // 是否自动删除
	false,   // 是否排他
	false,   // 是否等待
	nil,     // 额外参数
)
failOnError(err, "Failed to declare a queue")

// 注册消费者
msgs, err := ch.Consume(
	q.Name, // 队列
	"",     // 消费者标签
	true,   // 自动应答
	false,  // 排他
	false,  // 不等待
	false,  // 无额外参数
	nil,
)
failOnError(err, "Failed to register a consumer")

// 处理消息
forever := make(chan bool)

go func() {
	for d := range msgs {
		log.Printf("Received a message: %s", d.Body)
	}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

5. 高级特性

5.1 消息确认

// 消费者端关闭自动应答
msgs, err := ch.Consume(
	q.Name, // 队列
	"",     // 消费者标签
	false,  // 关闭自动应答
	false,  // 排他
	false,  // 不等待
	false,  // 无额外参数
	nil,
)

// 处理消息时手动确认
for d := range msgs {
	log.Printf("Received a message: %s", d.Body)
	// 模拟处理
	time.Sleep(1 * time.Second)
	d.Ack(false) // 手动确认消息
}

5.2 交换机使用

// 声明交换机
err = ch.ExchangeDeclare(
	"logs",   // 名称
	"fanout", // 类型
	true,     // 持久化
	false,    // 自动删除
	false,    // 内部
	false,    // 不等待
	nil,      // 参数
)
failOnError(err, "Failed to declare an exchange")

// 发布到交换机
err = ch.Publish(
	"logs", // 交换机名称
	"",     // 路由键
	false,  // 强制
	false,  // 立即
	amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})

5.3 队列绑定

// 消费者绑定队列到交换机
err = ch.QueueBind(
	q.Name, // 队列名称
	"",     // 路由键
	"logs", // 交换机名称
	false,
	nil,
)
failOnError(err, "Failed to bind a queue")

6. 错误处理和重连

RabbitMQ连接可能会中断,需要实现重连机制:

func connect() (*amqp.Connection, error) {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		return nil, err
	}
	
	// 监听连接关闭事件
	go func() {
		<-conn.NotifyClose(make(chan *amqp.Error))
		log.Println("Connection closed, reconnecting...")
		time.Sleep(5 * time.Second)
		conn = connect()
	}()
	
	return conn, nil
}

7. 最佳实践

  1. 连接复用:一个应用通常只需要一个连接,多个通道
  2. 通道复用:尽量复用通道而非频繁创建销毁
  3. 错误处理:妥善处理连接中断和通道错误
  4. 资源清理:确保关闭连接和通道
  5. QoS设置:合理设置预取计数
// 设置QoS
err = ch.Qos(
	1,     // 预取计数
	0,     // 预取大小
	false, // 全局设置
)
failOnError(err, "Failed to set QoS")

以上是streadway/amqp库的基本使用方法。根据实际需求,你可以进一步探索RabbitMQ的高级特性如死信队列、优先级队列等。

回到顶部