golang简单易用的RabbitMQ消息队列管理及发布订阅插件库jazz的使用
Golang简单易用的RabbitMQ消息队列管理及发布订阅插件库Jazz的使用
Jazz是一个用于快速简单连接RabbitMQ、消息传递和管理的抽象层,灵感来源于Jazz Jackrabbit和他对慢乌龟的永恒仇恨。
使用说明
该库包含三个主要部分:交换/队列方案创建、消息发布和消息消费。这种划分的最大好处是每个部分可以在单独的应用程序中。由于有专门的管理部分,消息的发布和消费被极大地简化了。
步骤1:连接到RabbitMQ
import(
"github.com/socifi/jazz"
)
var dsn = "amqp://guest:guest@localhost:5672/"
func main() {
// ...
c, err := jazz.Connect(dsn)
if err != nil {
t.Errorf("Could not connect to RabbitMQ: %v", err.Error())
return
}
//...
}
步骤2:创建方案
方案规范通过Settings
结构完成,可以轻松在YAML中指定。通常你需要解码YAML,然后创建所有队列和交换。
var data = []byte(`
exchanges:
exchange0:
durable: true
type: topic
exchange1:
durable: true
type: topic
bindings:
- exchange: "exchange0"
key: "key1"
- exchange: "exchange0"
key: "key2"
exchange2:
durable: true
type: topic
bindings:
- exchange: "exchange0"
key: "key3"
- exchange: "exchange1"
key: "key2"
exchange3:
durable: true
type: topic
bindings:
- exchange: "exchange0"
key: "key4"
queues:
queue0:
durable: true
bindings:
- exchange: "exchange0"
key: "key4"
queue1:
durable: true
bindings:
- exchange: "exchange1"
key: "key2"
queue2:
durable: true
bindings:
- exchange: "exchange1"
key: "#"
queue3:
durable: true
bindings:
- exchange: "exchange2"
key: "#"
queue4:
durable: true
bindings:
- exchange: "exchange3"
key: "#"
queue5:
durable: true
bindings:
- exchange: "exchange0"
key: "#"
`)
func main() {
// ...
reader := bytes.NewReader(data)
scheme, err := DecodeYaml(reader)
if err != nil {
t.Errorf("Could not read YAML: %v", err.Error())
return
}
err = c.CreateScheme(scheme)
if err != nil {
t.Errorf("Could not create scheme: %v", err.Error())
return
}
//...
// 友好地删除方案(不建议在生产环境中使用)
err = c.DeleteScheme(scheme)
if err != nil {
t.Errorf("Could not delete scheme: %v", err.Error())
return
}
}
步骤3:发布和/或消费消息
你可以在单独的应用程序中处理每个队列,或者像这样一起处理所有队列:
func main() {
// ...
f := func(msg []byte) {
fmt.Println(string(msg))
}
go c.ProcessQueue("queue1", f)
go c.ProcessQueue("queue2", f)
go c.ProcessQueue("queue3", f)
go c.ProcessQueue("queue4", f)
go c.ProcessQueue("queue5", f)
go c.ProcessQueue("queue6", f)
c.SendMessage("exchange0", "key1", "Hello World!")
c.SendMessage("exchange0", "key2", "Hello!")
c.SendMessage("exchange0", "key3", "World!")
c.SendMessage("exchange0", "key4", "Hi!")
c.SendMessage("exchange0", "key5", "Again!")
//...
}
注意事项
无意侵犯版权。Jazz Jackrabbit的名称和艺术作品是Epic MegaGames的知识产权,取自维基百科
更多关于golang简单易用的RabbitMQ消息队列管理及发布订阅插件库jazz的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang简单易用的RabbitMQ消息队列管理及发布订阅插件库jazz的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang中使用jazz库管理RabbitMQ消息队列
jazz是一个简单易用的Golang RabbitMQ客户端库,提供了简洁的API来进行消息队列管理和发布订阅模式实现。下面我将介绍如何使用jazz库。
安装jazz
首先安装jazz库:
go get github.com/soulmachine/jazz
基本使用
1. 连接到RabbitMQ
package main
import (
"log"
"github.com/soulmachine/jazz"
)
func main() {
// 连接到RabbitMQ服务器
conn, err := jazz.Connect("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
}
2. 发布消息
func publishMessage(ch *jazz.Channel) {
// 声明队列
q, err := ch.QueueDeclare(
"hello", // 队列名
false, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
// 发布消息
err = ch.Publish(
"", // 交换机
q.Name, // 路由键
false, // 是否强制
false, // 是否立即
jazz.Publishing{
ContentType: "text/plain",
Body: []byte("Hello World!"),
})
if err != nil {
log.Fatalf("Failed to publish a message: %v", err)
}
log.Println("Message published")
}
3. 消费消息
func consumeMessages(ch *jazz.Channel) {
// 声明队列
q, err := ch.QueueDeclare(
"hello", // 队列名
false, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
// 消费消息
msgs, err := ch.Consume(
q.Name, // 队列
"", // 消费者标识
true, // 自动应答
false, // 是否排他
false, // 是否不等待
false, // 是否不等待
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
// 处理消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Println("Waiting for messages...")
<-forever
}
发布/订阅模式
1. 发布消息到交换机
func publishToExchange(ch *jazz.Channel) {
// 声明交换机
err := ch.ExchangeDeclare(
"logs", // 交换机名
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否内部
false, // 是否等待
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %v", err)
}
// 发布消息到交换机
err = ch.Publish(
"logs", // 交换机
"", // 路由键
false, // 是否强制
false, // 是否立即
jazz.Publishing{
ContentType: "text/plain",
Body: []byte("Broadcast message"),
})
if err != nil {
log.Fatalf("Failed to publish a message: %v", err)
}
log.Println("Message published to exchange")
}
2. 订阅交换机消息
func subscribeToExchange(ch *jazz.Channel) {
// 声明交换机
err := ch.ExchangeDeclare(
"logs", // 交换机名
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否内部
false, // 是否等待
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %v", err)
}
// 声明临时队列
q, err := ch.QueueDeclare(
"", // 队列名(空表示让服务器生成)
false, // 是否持久化
false, // 是否自动删除
true, // 是否排他
false, // 是否等待
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
// 绑定队列到交换机
err = ch.QueueBind(
q.Name, // 队列名
"", // 路由键
"logs", // 交换机名
false, // 是否等待
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to bind a queue: %v", err)
}
// 消费消息
msgs, err := ch.Consume(
q.Name, // 队列
"", // 消费者标识
true, // 自动应答
false, // 是否排他
false, // 是否不等待
false, // 是否不等待
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Println("Waiting for messages...")
<-forever
}
高级特性
消息确认
func consumeWithAck(ch *jazz.Channel) {
q, err := ch.QueueDeclare(
"task_queue",
true, // 持久化队列
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
// 设置预取计数为1
err = ch.Qos(
1, // 预取计数
0, // 预取大小
false, // 全局
)
if err != nil {
log.Fatalf("Failed to set QoS: %v", err)
}
msgs, err := ch.Consume(
q.Name,
"",
false, // 手动确认
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 模拟处理耗时
time.Sleep(time.Second)
log.Printf("Done")
d.Ack(false) // 确认消息
}
}()
log.Println("Waiting for messages...")
<-forever
}
使用不同的交换机类型
func useDirectExchange(ch *jazz.Channel) {
// 声明直连交换机
err := ch.ExchangeDeclare(
"direct_logs",
"direct",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %v", err)
}
// 发布消息到特定路由键
err = ch.Publish(
"direct_logs",
"error", // 路由键
false,
false,
jazz.Publishing{
ContentType: "text/plain",
Body: []byte("Error message"),
})
if err != nil {
log.Fatalf("Failed to publish a message: %v", err)
}
}
jazz库提供了简洁的API来管理RabbitMQ,支持各种消息队列模式和高级特性。通过上述示例,您可以快速开始使用jazz构建基于RabbitMQ的消息系统。