golang基于Redis流的高效队列生产消费插件库redisqueue的使用

Golang基于Redis流的高效队列生产消费插件库redisqueue的使用

简介

redisqueue 是一个基于Redis流的Golang队列库,提供了生产者和消费者功能,可以高效地处理消息队列。

Version

特性

  • Producer结构体简化消息入队
  • Consumer结构体支持并发处理消息
  • 消息认领和确认机制,保证至少一次投递
  • 可见性超时设置,超时后由其他消费者处理
  • 流最大长度限制,避免内存耗尽
  • 优雅处理Unix信号(SIGINT和SIGTERM)
  • 错误处理通道集中处理错误
  • 优雅处理panic避免进程崩溃
  • 并发控制设置
  • 批量大小限制
  • 多流支持

安装

确保使用支持Go Modules的Go版本:

go mod init github.com/my/repo
go get github.com/robinjoseph08/redisqueue/v2

导入:

import "github.com/robinjoseph08/redisqueue/v2"

示例代码

生产者示例

package main

import (
	"fmt"

	"github.com/robinjoseph08/redisqueue/v2"
)

func main() {
	// 创建生产者配置
	p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{
		StreamMaxLength:      10000,  // 设置流最大长度
		ApproximateMaxLength: true,   // 使用近似最大长度
	})
	if err != nil {
		panic(err)
	}

	// 生产1000条消息
	for i := 0; i < 1000; i++ {
		err := p.Enqueue(&redisqueue.Message{
			Stream: "redisqueue:test",  // 设置流名称
			Values: map[string]interface{}{
				"index": i,  // 消息内容
			},
		})
		if err != nil {
			panic(err)
		}

		// 每100条打印一次进度
		if i%100 == 0 {
			fmt.Printf("enqueued %d\n", i)
		}
	}
}

消费者示例

package main

import (
	"fmt"
	"time"

	"github.com/robinjoseph08/redisqueue/v2"
)

func main() {
	// 创建消费者配置
	c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
		VisibilityTimeout: 60 * time.Second,  // 可见性超时
		BlockingTimeout:  5 * time.Second,   // 阻塞超时
		ReclaimInterval:  1 * time.Second,   // 回收间隔
		BufferSize:       100,               // 缓冲区大小
		Concurrency:      10,                // 并发数
	})
	if err != nil {
		panic(err)
	}

	// 注册处理函数
	c.Register("redisqueue:test", process)

	// 错误处理协程
	go func() {
		for err := range c.Errors {
			// 处理错误
			fmt.Printf("err: %+v\n", err)
		}
	}()

	fmt.Println("starting")
	c.Run()  // 启动消费者
	fmt.Println("stopped")
}

// 消息处理函数
func process(msg *redisqueue.Message) error {
	fmt.Printf("processing message: %v\n", msg.Values["index"])
	return nil
}

这个示例展示了如何使用redisqueue库创建一个生产者和消费者,生产者向Redis流中发送消息,消费者从流中读取并处理消息。


更多关于golang基于Redis流的高效队列生产消费插件库redisqueue的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang基于Redis流的高效队列生产消费插件库redisqueue的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang基于Redis流的高效队列生产消费插件库redisqueue使用指南

概述

redisqueue是一个基于Redis Streams的Golang库,提供了高效的消息队列生产消费功能。相比传统队列,Redis Streams提供了更强大的功能,如消息持久化、消费者组、消息确认机制等。

安装

go get github.com/adjust/redisqueue/v2

基本使用

1. 生产者示例

package main

import (
	"context"
	"log"
	"time"

	"github.com/adjust/redisqueue/v2"
	"github.com/go-redis/redis/v8"
)

func main() {
	// 创建Redis客户端
	client := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // 无密码
		DB:       0,  // 默认DB
	})

	// 创建生产者
	producer, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{
		StreamMaxLength:      1000, // 流最大长度
		ApproximateMaxLength: true, // 近似最大长度
		RedisClient:         client,
	})
	if err != nil {
		log.Fatal(err)
	}

	// 发送消息
	for i := 0; i < 10; i++ {
		err := producer.Enqueue(&redisqueue.Message{
			Stream: "my_stream", // 流名称
			Values: map[string]interface{}{
				"message":   "Hello, Redis Stream!",
				"timestamp": time.Now().Unix(),
				"count":     i,
			},
		})
		if err != nil {
			log.Printf("Failed to enqueue message: %v", err)
		} else {
			log.Printf("Message %d enqueued successfully", i)
		}
	}
}

2. 消费者示例

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/adjust/redisqueue/v2"
	"github.com/go-redis/redis/v8"
)

func main() {
	// 创建Redis客户端
	client := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // 无密码
		DB:       0,  // 默认DB
	})

	// 创建消费者
	consumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
		VisibilityTimeout: 60 * time.Second, // 消息可见超时
		BlockingTimeout:  5 * time.Second,  // 阻塞超时
		ReclaimInterval:  1 * time.Second,  // 回收间隔
		BufferSize:       100,              // 缓冲区大小
		Concurrency:      5,                // 并发数
		RedisClient:      client,
	})
	if err != nil {
		log.Fatal(err)
	}

	// 注册消息处理函数
	consumer.Register("my_stream", "my_consumer_group", processMessage)

	// 启动消费者
	go func() {
		if err := consumer.Run(); err != nil {
			log.Fatal(err)
		}
	}()

	// 等待退出信号
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit

	// 优雅关闭
	if err := consumer.Shutdown(); err != nil {
		log.Fatal(err)
	}
}

// 消息处理函数
func processMessage(msg *redisqueue.Message) error {
	log.Printf("Received message: %v", msg.Values)
	// 处理消息...
	// 返回nil表示处理成功,消息将被确认
	// 返回错误表示处理失败,消息将被重新投递
	return nil
}

高级特性

1. 消费者组

Redis Streams支持消费者组,允许多个消费者共同消费同一个流:

// 创建消费者组
err := client.XGroupCreateMkStream(context.Background(), "my_stream", "my_consumer_group", "0").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
    log.Fatal(err)
}

// 多个消费者可以注册到同一个消费者组
consumer1.Register("my_stream", "my_consumer_group", processMessage1)
consumer2.Register("my_stream", "my_consumer_group", processMessage2)

2. 消息确认

消息处理成功后需要确认:

func processMessage(msg *redisqueue.Message) error {
    // 处理消息...
    if err := msg.Ack(); err != nil {
        log.Printf("Failed to ack message: %v", err)
        return err
    }
    return nil
}

3. 自定义错误处理

consumer.SetErrorHandler(func(err error) {
    log.Printf("Consumer error: %v", err)
})

性能优化建议

  1. 批量处理:适当增加BufferSizeConcurrency参数可以提高吞吐量
  2. 合理设置超时:根据业务特点调整VisibilityTimeoutBlockingTimeout
  3. 监控:监控队列长度和消费者延迟,及时扩展消费者
  4. 错误重试:实现合理的错误重试机制

总结

redisqueue库提供了简单易用的API来操作Redis Streams,适合构建高吞吐量、可靠的消息队列系统。通过消费者组、消息确认等机制,可以构建健壮的分布式消息处理系统。

相比传统Redis列表实现的队列,Redis Streams提供了更丰富的功能,如消息持久化、消费者组、消息历史查询等,是构建现代消息系统的理想选择。

回到顶部