golang简单易用的RabbitMQ消息队列管理及发布订阅插件库jazz的使用

Golang简单易用的RabbitMQ消息队列管理及发布订阅插件库Jazz的使用

Jazz是一个用于快速简单连接RabbitMQ、消息传递和管理的抽象层,灵感来源于Jazz Jackrabbit和他对慢乌龟的永恒仇恨。

Jazz Jackrabbit

使用说明

该库包含三个主要部分:交换/队列方案创建、消息发布和消息消费。这种划分的最大好处是每个部分可以在单独的应用程序中。由于有专门的管理部分,消息的发布和消费被极大地简化了。

步骤1:连接到RabbitMQ

import(
	"github.com/socifi/jazz"
)

var dsn = "amqp://guest:guest@localhost:5672/"

func main() {
	// ...

	c, err := jazz.Connect(dsn)
	if err != nil {
		t.Errorf("Could not connect to RabbitMQ: %v", err.Error())
		return
	}

	//...
}

步骤2:创建方案

方案规范通过Settings结构完成,可以轻松在YAML中指定。通常你需要解码YAML,然后创建所有队列和交换。

var data = []byte(`
exchanges:
  exchange0:
    durable: true
    type: topic
  exchange1:
    durable: true
    type: topic
    bindings:
      - exchange: "exchange0"
        key: "key1"
      - exchange: "exchange0"
        key: "key2"
  exchange2:
    durable: true
    type: topic
    bindings:
      - exchange: "exchange0"
        key: "key3"
      - exchange: "exchange1"
        key: "key2"
  exchange3:
    durable: true
    type: topic
    bindings:
      - exchange: "exchange0"
        key: "key4"
queues:
  queue0:
    durable: true
    bindings:
      - exchange: "exchange0"
        key: "key4"
  queue1:
    durable: true
    bindings:
      - exchange: "exchange1"
        key: "key2"
  queue2:
    durable: true
    bindings:
      - exchange: "exchange1"
        key: "#"
  queue3:
    durable: true
    bindings:
      - exchange: "exchange2"
        key: "#"
  queue4:
    durable: true
    bindings:
      - exchange: "exchange3"
        key: "#"
  queue5:
    durable: true
    bindings:
      - exchange: "exchange0"
        key: "#"
`)

func main() {
	// ...

	reader := bytes.NewReader(data)
	scheme, err := DecodeYaml(reader)
	if err != nil {
		t.Errorf("Could not read YAML: %v", err.Error())
		return
	}

	err = c.CreateScheme(scheme)
	if err != nil {
		t.Errorf("Could not create scheme: %v", err.Error())
		return
	}

	//...

	// 友好地删除方案(不建议在生产环境中使用)
	err = c.DeleteScheme(scheme)
	if err != nil {
		t.Errorf("Could not delete scheme: %v", err.Error())
		return
	}
}

步骤3:发布和/或消费消息

你可以在单独的应用程序中处理每个队列,或者像这样一起处理所有队列:

func main() {
	// ...

	f := func(msg []byte) {
		fmt.Println(string(msg))
	}

	go c.ProcessQueue("queue1", f)
	go c.ProcessQueue("queue2", f)
	go c.ProcessQueue("queue3", f)
	go c.ProcessQueue("queue4", f)
	go c.ProcessQueue("queue5", f)
	go c.ProcessQueue("queue6", f)
	c.SendMessage("exchange0", "key1", "Hello World!")
	c.SendMessage("exchange0", "key2", "Hello!")
	c.SendMessage("exchange0", "key3", "World!")
	c.SendMessage("exchange0", "key4", "Hi!")
	c.SendMessage("exchange0", "key5", "Again!")

	//...
}

注意事项

无意侵犯版权。Jazz Jackrabbit的名称和艺术作品是Epic MegaGames的知识产权,取自维基百科


更多关于golang简单易用的RabbitMQ消息队列管理及发布订阅插件库jazz的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang简单易用的RabbitMQ消息队列管理及发布订阅插件库jazz的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang中使用jazz库管理RabbitMQ消息队列

jazz是一个简单易用的Golang RabbitMQ客户端库,提供了简洁的API来进行消息队列管理和发布订阅模式实现。下面我将介绍如何使用jazz库。

安装jazz

首先安装jazz库:

go get github.com/soulmachine/jazz

基本使用

1. 连接到RabbitMQ

package main

import (
	"log"
	"github.com/soulmachine/jazz"
)

func main() {
	// 连接到RabbitMQ服务器
	conn, err := jazz.Connect("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()
	
	// 创建通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	defer ch.Close()
}

2. 发布消息

func publishMessage(ch *jazz.Channel) {
	// 声明队列
	q, err := ch.QueueDeclare(
		"hello", // 队列名
		false,   // 是否持久化
		false,   // 是否自动删除
		false,   // 是否排他
		false,   // 是否等待
		nil,     // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 发布消息
	err = ch.Publish(
		"",     // 交换机
		q.Name, // 路由键
		false,  // 是否强制
		false,  // 是否立即
		jazz.Publishing{
			ContentType: "text/plain",
			Body:        []byte("Hello World!"),
		})
	if err != nil {
		log.Fatalf("Failed to publish a message: %v", err)
	}
	log.Println("Message published")
}

3. 消费消息

func consumeMessages(ch *jazz.Channel) {
	// 声明队列
	q, err := ch.QueueDeclare(
		"hello", // 队列名
		false,   // 是否持久化
		false,   // 是否自动删除
		false,   // 是否排他
		false,   // 是否等待
		nil,     // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 消费消息
	msgs, err := ch.Consume(
		q.Name, // 队列
		"",     // 消费者标识
		true,   // 自动应答
		false,  // 是否排他
		false,  // 是否不等待
		false,  // 是否不等待
		nil,    // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to register a consumer: %v", err)
	}

	// 处理消息
	forever := make(chan bool)
	
	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()
	
	log.Println("Waiting for messages...")
	<-forever
}

发布/订阅模式

1. 发布消息到交换机

func publishToExchange(ch *jazz.Channel) {
	// 声明交换机
	err := ch.ExchangeDeclare(
		"logs",   // 交换机名
		"fanout", // 交换机类型
		true,     // 是否持久化
		false,    // 是否自动删除
		false,    // 是否内部
		false,    // 是否等待
		nil,      // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to declare an exchange: %v", err)
	}

	// 发布消息到交换机
	err = ch.Publish(
		"logs", // 交换机
		"",     // 路由键
		false,  // 是否强制
		false,  // 是否立即
		jazz.Publishing{
			ContentType: "text/plain",
			Body:        []byte("Broadcast message"),
		})
	if err != nil {
		log.Fatalf("Failed to publish a message: %v", err)
	}
	log.Println("Message published to exchange")
}

2. 订阅交换机消息

func subscribeToExchange(ch *jazz.Channel) {
	// 声明交换机
	err := ch.ExchangeDeclare(
		"logs",   // 交换机名
		"fanout", // 交换机类型
		true,     // 是否持久化
		false,    // 是否自动删除
		false,    // 是否内部
		false,    // 是否等待
		nil,      // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to declare an exchange: %v", err)
	}

	// 声明临时队列
	q, err := ch.QueueDeclare(
		"",    // 队列名(空表示让服务器生成)
		false, // 是否持久化
		false, // 是否自动删除
		true,  // 是否排他
		false, // 是否等待
		nil,   // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 绑定队列到交换机
	err = ch.QueueBind(
		q.Name, // 队列名
		"",     // 路由键
		"logs", // 交换机名
		false,  // 是否等待
		nil,    // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to bind a queue: %v", err)
	}

	// 消费消息
	msgs, err := ch.Consume(
		q.Name, // 队列
		"",     // 消费者标识
		true,   // 自动应答
		false,  // 是否排他
		false,  // 是否不等待
		false,  // 是否不等待
		nil,    // 额外参数
	)
	if err != nil {
		log.Fatalf("Failed to register a consumer: %v", err)
	}

	forever := make(chan bool)
	
	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()
	
	log.Println("Waiting for messages...")
	<-forever
}

高级特性

消息确认

func consumeWithAck(ch *jazz.Channel) {
	q, err := ch.QueueDeclare(
		"task_queue",
		true,  // 持久化队列
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 设置预取计数为1
	err = ch.Qos(
		1,     // 预取计数
		0,     // 预取大小
		false, // 全局
	)
	if err != nil {
		log.Fatalf("Failed to set QoS: %v", err)
	}

	msgs, err := ch.Consume(
		q.Name,
		"",
		false, // 手动确认
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatalf("Failed to register a consumer: %v", err)
	}

	forever := make(chan bool)
	
	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
			// 模拟处理耗时
			time.Sleep(time.Second)
			log.Printf("Done")
			d.Ack(false) // 确认消息
		}
	}()
	
	log.Println("Waiting for messages...")
	<-forever
}

使用不同的交换机类型

func useDirectExchange(ch *jazz.Channel) {
	// 声明直连交换机
	err := ch.ExchangeDeclare(
		"direct_logs",
		"direct",
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatalf("Failed to declare an exchange: %v", err)
	}

	// 发布消息到特定路由键
	err = ch.Publish(
		"direct_logs",
		"error", // 路由键
		false,
		false,
		jazz.Publishing{
			ContentType: "text/plain",
			Body:        []byte("Error message"),
		})
	if err != nil {
		log.Fatalf("Failed to publish a message: %v", err)
	}
}

jazz库提供了简洁的API来管理RabbitMQ,支持各种消息队列模式和高级特性。通过上述示例,您可以快速开始使用jazz构建基于RabbitMQ的消息系统。

回到顶部