golang轻量级本地消息队列管理插件库RapidMQ的使用

Golang轻量级本地消息队列管理插件库RapidMQ的使用

简介

RapidMQ是一个纯Go语言实现的、高效、轻量级且可靠的本地消息队列管理库。

Go Report Card Build Status Coverage Status GoDoc

安装

go get github.com/sybrexsys/RapidMQ/queue

要求

  • 需要Go 1.4或更高版本

使用示例

基本队列操作

package main

import (
	"github.com/sybrexsys/RapidMQ/queue"
	"log"
)

// 自定义日志实现
type MyLogger struct{}

func (l *MyLogger) Trace(msg string, a ...interface{}) {
	log.Printf("[TRACE] "+msg, a...)
}

func (l *MyLogger) Info(msg string, a ...interface{}) {
	log.Printf("[INFO] "+msg, a...)
}

func (l *MyLogger) Warning(msg string, a ...interface{}) {
	log.Printf("[WARN] "+msg, a...)
}

func (l *MyLogger) Error(msg string, a ...interface{}) {
	log.Printf("[ERROR] "+msg, a...)
}

// 自定义Worker实现
type MyWorker struct {
	id WorkerID
}

func (w *MyWorker) ProcessMessage(q *queue.Queue, msg *queue.Message, ch chan Worker) {
	log.Printf("Processing message ID: %d, Content: %s", msg.ID, string(msg.Buffer))
	
	// 处理完成后通知队列
	q.Process(w.id, true)
	ch <- w
}

func (w *MyWorker) ProcessTimeout(q *queue.Queue, ch chan Worker) {
	// 超时处理
	q.Process(w.id, true)
	ch <- w
}

func (w *MyWorker) GetID() WorkerID {
	return w.id
}

func (w *MyWorker) Close() {
	log.Printf("Worker %d closed", w.id)
}

// 自定义WorkerFactory实现
type MyWorkerFactory struct{}

func (f *MyWorkerFactory) CreateWorker() Worker {
	return &MyWorker{id: WorkerID(time.Now().UnixNano())}
}

func (f *MyWorkerFactory) NeedTimeoutProcessing() bool {
	return false
}

func main() {
	// 创建队列
	q, err := queue.CreateQueue(
		"MyQueue",              // 队列名称
		"./queue_storage",      // 存储路径
		&MyLogger{},            // 日志记录器
		&MyWorkerFactory{},     // Worker工厂
		nil,                    // 选项(使用默认)
	)
	if err != nil {
		log.Fatal(err)
	}
	defer q.Close()

	// 插入消息
	message := []byte("Hello, RapidMQ!")
	if !q.Insert(message) {
		log.Println("Failed to insert message")
	}

	// 获取队列中的消息数量
	count := q.Count()
	log.Printf("Queue contains %d messages", count)
}

消息结构

type Message struct {
	ID      StorageIdx  // 消息ID
	Buffer  []byte      // 消息内容缓冲区
}

WorkerFactory接口

type WorkerFactory interface {
	CreateWorker() Worker          // 创建新Worker
	NeedTimeoutProcessing() bool   // 是否需要超时处理
}

Worker接口

type Worker interface {
	ProcessMessage(*Queue, *Message, chan Worker)  // 处理消息
	ProcessTimeout(*Queue, chan Worker)            // 处理超时
	GetID() WorkerID                               // 获取Worker ID
	Close()                                        // 关闭Worker
}

日志接口

type Logging interface {
	Trace(msg string, a ...interface{})    // 跟踪日志
	Info(msg string, a ...interface{})     // 信息日志
	Warning(msg string, a ...interface{})  // 警告日志
	Error(msg string, a ...interface{})    // 错误日志
}

作者

Vadim Shakun: vadim.shakun@gmail.com

许可证

RapidMQ使用Apache 2.0许可证。


更多关于golang轻量级本地消息队列管理插件库RapidMQ的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang轻量级本地消息队列管理插件库RapidMQ的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


RapidMQ - Golang轻量级本地消息队列管理库

RapidMQ是一个轻量级的本地消息队列管理库,专为Golang设计,适用于需要简单高效消息队列功能的场景。下面我将详细介绍其使用方法和示例代码。

主要特性

  • 轻量级设计,无外部依赖
  • 支持多队列管理
  • 简单的生产/消费模型
  • 内存存储,高性能
  • 线程安全

安装

go get github.com/sybrexsys/RapidMQ

基本使用示例

1. 创建队列管理器

package main

import (
	"fmt"
	"github.com/sybrexsys/RapidMQ"
)

func main() {
	// 创建队列管理器
	manager := rapidmq.NewManager()
	defer manager.Close()
	
	// 创建一个名为"test"的队列
	queue, err := manager.CreateQueue("test")
	if err != nil {
		fmt.Println("创建队列失败:", err)
		return
	}
	
	// 生产消息
	err = queue.Put("Hello, RapidMQ!")
	if err != nil {
		fmt.Println("生产消息失败:", err)
		return
	}
	
	// 消费消息
	msg, err := queue.Get()
	if err != nil {
		fmt.Println("消费消息失败:", err)
		return
	}
	
	fmt.Println("收到消息:", msg)
}

2. 多队列管理

func multiQueueExample() {
	manager := rapidmq.NewManager()
	defer manager.Close()
	
	// 创建多个队列
	queue1, _ := manager.CreateQueue("queue1")
	queue2, _ := manager.CreateQueue("queue2")
	
	// 向不同队列发送消息
	queue1.Put("消息1")
	queue2.Put("消息2")
	
	// 从不同队列获取消息
	msg1, _ := queue1.Get()
	msg2, _ := queue2.Get()
	
	fmt.Println("queue1:", msg1)
	fmt.Println("queue2:", msg2)
}

3. 批量操作

func batchOperations() {
	manager := rapidmq.NewManager()
	defer manager.Close()
	
	queue, _ := manager.CreateQueue("batch_queue")
	
	// 批量生产消息
	for i := 0; i < 10; i++ {
		queue.Put(fmt.Sprintf("消息%d", i))
	}
	
	// 批量消费消息
	for i := 0; i < 10; i++ {
		msg, err := queue.Get()
		if err != nil {
			fmt.Println("获取消息出错:", err)
			break
		}
		fmt.Println("消费:", msg)
	}
}

4. 队列状态监控

func queueMonitoring() {
	manager := rapidmq.NewManager()
	defer manager.Close()
	
	queue, _ := manager.CreateQueue("monitor_queue")
	
	// 生产一些消息
	for i := 0; i < 5; i++ {
		queue.Put(fmt.Sprintf("测试消息%d", i))
	}
	
	// 获取队列信息
	info := queue.GetInfo()
	fmt.Printf("队列信息: 名称=%s, 消息数=%d, 消费者数=%d\n", 
		info.Name, info.MessageCount, info.ConsumerCount)
}

高级用法

1. 自定义消息处理器

func customHandlerExample() {
	manager := rapidmq.NewManager()
	defer manager.Close()
	
	queue, _ := manager.CreateQueue("handler_queue")
	
	// 注册消息处理器
	queue.SetHandler(func(msg interface{}) {
		fmt.Printf("处理器收到消息: %v\n", msg)
	})
	
	// 发送消息会自动触发处理器
	queue.Put("测试处理器")
}

2. 消息确认机制

func ackExample() {
	manager := rapidmq.NewManager()
	defer manager.Close()
	
	queue, _ := manager.CreateQueue("ack_queue")
	
	// 启用确认模式
	queue.EnableAck()
	
	// 生产消息
	queue.Put("需要确认的消息")
	
	// 获取消息但不确认
	msg, _ := queue.Get()
	fmt.Println("收到消息:", msg)
	
	// 再次获取会得到相同的消息
	msg2, _ := queue.Get()
	fmt.Println("再次获取:", msg2)
	
	// 确认消息
	queue.Ack()
	
	// 现在消息已被移除
	_, err := queue.Get()
	if err != nil {
		fmt.Println("队列已空:", err)
	}
}

性能优化建议

  1. 对于高频消息场景,考虑批量生产/消费
  2. 合理设置队列容量,避免内存过度消耗
  3. 长时间不用的队列应及时关闭
  4. 考虑结合goroutine实现并发消费

注意事项

  • RapidMQ是内存队列,程序重启后消息会丢失
  • 不适合分布式场景,仅限单机使用
  • 大量消息堆积可能导致内存压力

RapidMQ作为轻量级本地消息队列解决方案,非常适合小型项目或作为大型系统的本地组件使用。其简洁的API和高效的实现使其成为Golang开发中处理消息队列需求的良好选择。

回到顶部