golang轻量级本地消息队列管理插件库RapidMQ的使用
Golang轻量级本地消息队列管理插件库RapidMQ的使用
简介
RapidMQ是一个纯Go语言实现的、高效、轻量级且可靠的本地消息队列管理库。
安装
go get github.com/sybrexsys/RapidMQ/queue
要求
- 需要Go 1.4或更高版本
使用示例
基本队列操作
package main
import (
"github.com/sybrexsys/RapidMQ/queue"
"log"
)
// 自定义日志实现
type MyLogger struct{}
func (l *MyLogger) Trace(msg string, a ...interface{}) {
log.Printf("[TRACE] "+msg, a...)
}
func (l *MyLogger) Info(msg string, a ...interface{}) {
log.Printf("[INFO] "+msg, a...)
}
func (l *MyLogger) Warning(msg string, a ...interface{}) {
log.Printf("[WARN] "+msg, a...)
}
func (l *MyLogger) Error(msg string, a ...interface{}) {
log.Printf("[ERROR] "+msg, a...)
}
// 自定义Worker实现
type MyWorker struct {
id WorkerID
}
func (w *MyWorker) ProcessMessage(q *queue.Queue, msg *queue.Message, ch chan Worker) {
log.Printf("Processing message ID: %d, Content: %s", msg.ID, string(msg.Buffer))
// 处理完成后通知队列
q.Process(w.id, true)
ch <- w
}
func (w *MyWorker) ProcessTimeout(q *queue.Queue, ch chan Worker) {
// 超时处理
q.Process(w.id, true)
ch <- w
}
func (w *MyWorker) GetID() WorkerID {
return w.id
}
func (w *MyWorker) Close() {
log.Printf("Worker %d closed", w.id)
}
// 自定义WorkerFactory实现
type MyWorkerFactory struct{}
func (f *MyWorkerFactory) CreateWorker() Worker {
return &MyWorker{id: WorkerID(time.Now().UnixNano())}
}
func (f *MyWorkerFactory) NeedTimeoutProcessing() bool {
return false
}
func main() {
// 创建队列
q, err := queue.CreateQueue(
"MyQueue", // 队列名称
"./queue_storage", // 存储路径
&MyLogger{}, // 日志记录器
&MyWorkerFactory{}, // Worker工厂
nil, // 选项(使用默认)
)
if err != nil {
log.Fatal(err)
}
defer q.Close()
// 插入消息
message := []byte("Hello, RapidMQ!")
if !q.Insert(message) {
log.Println("Failed to insert message")
}
// 获取队列中的消息数量
count := q.Count()
log.Printf("Queue contains %d messages", count)
}
消息结构
type Message struct {
ID StorageIdx // 消息ID
Buffer []byte // 消息内容缓冲区
}
WorkerFactory接口
type WorkerFactory interface {
CreateWorker() Worker // 创建新Worker
NeedTimeoutProcessing() bool // 是否需要超时处理
}
Worker接口
type Worker interface {
ProcessMessage(*Queue, *Message, chan Worker) // 处理消息
ProcessTimeout(*Queue, chan Worker) // 处理超时
GetID() WorkerID // 获取Worker ID
Close() // 关闭Worker
}
日志接口
type Logging interface {
Trace(msg string, a ...interface{}) // 跟踪日志
Info(msg string, a ...interface{}) // 信息日志
Warning(msg string, a ...interface{}) // 警告日志
Error(msg string, a ...interface{}) // 错误日志
}
作者
Vadim Shakun: vadim.shakun@gmail.com
许可证
RapidMQ使用Apache 2.0许可证。
更多关于golang轻量级本地消息队列管理插件库RapidMQ的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang轻量级本地消息队列管理插件库RapidMQ的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
RapidMQ - Golang轻量级本地消息队列管理库
RapidMQ是一个轻量级的本地消息队列管理库,专为Golang设计,适用于需要简单高效消息队列功能的场景。下面我将详细介绍其使用方法和示例代码。
主要特性
- 轻量级设计,无外部依赖
- 支持多队列管理
- 简单的生产/消费模型
- 内存存储,高性能
- 线程安全
安装
go get github.com/sybrexsys/RapidMQ
基本使用示例
1. 创建队列管理器
package main
import (
"fmt"
"github.com/sybrexsys/RapidMQ"
)
func main() {
// 创建队列管理器
manager := rapidmq.NewManager()
defer manager.Close()
// 创建一个名为"test"的队列
queue, err := manager.CreateQueue("test")
if err != nil {
fmt.Println("创建队列失败:", err)
return
}
// 生产消息
err = queue.Put("Hello, RapidMQ!")
if err != nil {
fmt.Println("生产消息失败:", err)
return
}
// 消费消息
msg, err := queue.Get()
if err != nil {
fmt.Println("消费消息失败:", err)
return
}
fmt.Println("收到消息:", msg)
}
2. 多队列管理
func multiQueueExample() {
manager := rapidmq.NewManager()
defer manager.Close()
// 创建多个队列
queue1, _ := manager.CreateQueue("queue1")
queue2, _ := manager.CreateQueue("queue2")
// 向不同队列发送消息
queue1.Put("消息1")
queue2.Put("消息2")
// 从不同队列获取消息
msg1, _ := queue1.Get()
msg2, _ := queue2.Get()
fmt.Println("queue1:", msg1)
fmt.Println("queue2:", msg2)
}
3. 批量操作
func batchOperations() {
manager := rapidmq.NewManager()
defer manager.Close()
queue, _ := manager.CreateQueue("batch_queue")
// 批量生产消息
for i := 0; i < 10; i++ {
queue.Put(fmt.Sprintf("消息%d", i))
}
// 批量消费消息
for i := 0; i < 10; i++ {
msg, err := queue.Get()
if err != nil {
fmt.Println("获取消息出错:", err)
break
}
fmt.Println("消费:", msg)
}
}
4. 队列状态监控
func queueMonitoring() {
manager := rapidmq.NewManager()
defer manager.Close()
queue, _ := manager.CreateQueue("monitor_queue")
// 生产一些消息
for i := 0; i < 5; i++ {
queue.Put(fmt.Sprintf("测试消息%d", i))
}
// 获取队列信息
info := queue.GetInfo()
fmt.Printf("队列信息: 名称=%s, 消息数=%d, 消费者数=%d\n",
info.Name, info.MessageCount, info.ConsumerCount)
}
高级用法
1. 自定义消息处理器
func customHandlerExample() {
manager := rapidmq.NewManager()
defer manager.Close()
queue, _ := manager.CreateQueue("handler_queue")
// 注册消息处理器
queue.SetHandler(func(msg interface{}) {
fmt.Printf("处理器收到消息: %v\n", msg)
})
// 发送消息会自动触发处理器
queue.Put("测试处理器")
}
2. 消息确认机制
func ackExample() {
manager := rapidmq.NewManager()
defer manager.Close()
queue, _ := manager.CreateQueue("ack_queue")
// 启用确认模式
queue.EnableAck()
// 生产消息
queue.Put("需要确认的消息")
// 获取消息但不确认
msg, _ := queue.Get()
fmt.Println("收到消息:", msg)
// 再次获取会得到相同的消息
msg2, _ := queue.Get()
fmt.Println("再次获取:", msg2)
// 确认消息
queue.Ack()
// 现在消息已被移除
_, err := queue.Get()
if err != nil {
fmt.Println("队列已空:", err)
}
}
性能优化建议
- 对于高频消息场景,考虑批量生产/消费
- 合理设置队列容量,避免内存过度消耗
- 长时间不用的队列应及时关闭
- 考虑结合goroutine实现并发消费
注意事项
- RapidMQ是内存队列,程序重启后消息会丢失
- 不适合分布式场景,仅限单机使用
- 大量消息堆积可能导致内存压力
RapidMQ作为轻量级本地消息队列解决方案,非常适合小型项目或作为大型系统的本地组件使用。其简洁的API和高效的实现使其成为Golang开发中处理消息队列需求的良好选择。