golang基于Redis集群流的高可靠队列客户端插件ami的使用

Golang基于Redis集群流的高可靠队列客户端插件Ami的使用

Ami是一个基于Redis Cluster Streams的高可靠队列客户端插件,专为Golang设计。

性能表现

性能取决于以下因素:

  • Redis集群节点数量
  • 客户端到Redis集群主节点的ping延迟
  • 节点间的网络速度
  • 消息大小
  • Ami配置

示例性能数据(10节点Redis集群,部分节点在另一个数据中心,50ms ping延迟,1主1从,消息为"{}"):

$ go run examples/performance/main.go
Produced 1000000 in 3.423883 sec, rps 292066.022156
Consumed 151000 in 1.049238 sec, rps 143913.931722
Acked 151000 in 0.973587 sec, rps 155096.612263

生产者示例

package main

import (
	"time"
	
	"github.com/kak-tus/ami"
	"github.com/go-redis/redis/v8"
)

// 错误日志处理器
type errorLogger struct{}

func (l *errorLogger) AmiError(err error) {
	println("Got error from Ami:", err.Error())
}

func main() {
	// 创建生产者
	pr, err := ami.NewProducer(
		ami.ProducerOptions{
			ErrorNotifier:     &errorLogger{},  // 错误通知器
			Name:              "ruthie",       // 队列名称
			PendingBufferSize: 10000000,       // 待处理缓冲区大小
			PipeBufferSize:    50000,          // 管道缓冲区大小
			PipePeriod:        time.Microsecond * 1000, // 管道周期
			ShardsCount:       10,             // 分片数量
		},
		&redis.ClusterOptions{
			Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"}, // Redis集群地址
			ReadTimeout:  time.Second * 60,  // 读取超时
			WriteTimeout: time.Second * 60,  // 写入超时
		},
	)
	if err != nil {
		panic(err)
	}

	// 发送10000条消息
	for i := 0; i < 10000; i++ {
		pr.Send("{}")
	}

	// 关闭生产者
	pr.Close()
}

消费者示例

package main

import (
	"sync"
	"time"
	
	"github.com/kak-tus/ami"
	"github.com/go-redis/redis/v8"
)

// 错误日志处理器
type errorLogger struct{}

func (l *errorLogger) AmiError(err error) {
	println("Got error from Ami:", err.Error())
}

func main() {
	// 创建消费者
	cn, err := ami.NewConsumer(
		ami.ConsumerOptions{
			Consumer:          "alice",       // 消费者名称
			ErrorNotifier:     &errorLogger{}, // 错误通知器
			Name:              "ruthie",      // 队列名称
			PendingBufferSize: 10000000,      // 待处理缓冲区大小
			PipeBufferSize:    50000,         // 管道缓冲区大小
			PipePeriod:        time.Microsecond * 1000, // 管道周期
			PrefetchCount:     100,           // 预取数量
			ShardsCount:       10,            // 分片数量
		},
		&redis.ClusterOptions{
			Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"}, // Redis集群地址
			ReadTimeout:  time.Second * 60,  // 读取超时
			WriteTimeout: time.Second * 60,  // 写入超时
		},
	)
	if err != nil {
		panic(err)
	}

	// 启动消费者
	c := cn.Start()

	// 使用WaitGroup等待处理完成
	wg := sync.WaitGroup{}
	wg.Add(1)

	// 启动goroutine处理消息
	go func() {
		for {
			m, more := <-c
			if !more {
				break
			}
			println("Got", m.Body, "ID", m.ID)
			cn.Ack(m)  // 确认消息
		}
		wg.Done()
	}()

	// 等待1秒
	time.Sleep(time.Second)

	// 停止消费者
	cn.Stop()
	wg.Wait()

	// 关闭消费者
	cn.Close()
}

以上示例展示了如何使用Ami库创建生产者和消费者,与Redis集群交互实现高可靠的消息队列功能。


更多关于golang基于Redis集群流的高可靠队列客户端插件ami的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang基于Redis集群流的高可靠队列客户端插件ami的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang基于Redis集群流的高可靠队列客户端插件ami使用指南

什么是ami

ami是一个基于Redis Streams的高可靠消息队列客户端插件,专为Golang设计。它提供了消息的生产、消费、确认等完整功能,特别适合构建分布式系统中的可靠消息队列。

主要特性

  • 基于Redis Streams实现,支持Redis集群
  • 消息持久化和可靠性保证
  • 消费者组支持,实现负载均衡
  • 消息重试和死信队列机制
  • 简洁易用的API接口

安装

go get github.com/ami-team/ami

基本使用示例

1. 初始化客户端

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ami-team/ami"
	"github.com/go-redis/redis/v8"
)

func main() {
	// 创建Redis客户端
	redisClient := redis.NewClusterClient(&redis.ClusterOptions{
		Addrs: []string{"127.0.0.1:7000", "127.0.0.1:7001", "127.0.0.1:7002"},
	})

	// 创建ami客户端
	client, err := ami.NewClient(
		ami.WithRedisClient(redisClient),
		ami.WithStreamName("my_stream"),
		ami.WithConsumerGroup("my_group"),
		ami.WithConsumerName("consumer1"),
	)
	if err != nil {
		panic(err)
	}
	defer client.Close()
	
	// 使用示例...
}

2. 生产消息

// 生产消息
msgID, err := client.Produce(context.Background(), &ami.Message{
	Body: []byte("hello world"),
	Headers: map[string]interface{}{
		"key1": "value1",
		"key2": 123,
	},
})
if err != nil {
	fmt.Printf("Produce message failed: %v\n", err)
	return
}
fmt.Printf("Message produced with ID: %s\n", msgID)

3. 消费消息

// 消费消息
messages, err := client.Consume(context.Background(), 10) // 一次最多获取10条消息
if err != nil {
	fmt.Printf("Consume message failed: %v\n", err)
	return
}

for _, msg := range messages {
	fmt.Printf("Received message: ID=%s, Body=%s, Headers=%v\n", 
		msg.ID, string(msg.Body), msg.Headers)
	
	// 处理消息...
	
	// 确认消息处理完成
	if err := client.Ack(context.Background(), msg.ID); err != nil {
		fmt.Printf("Ack message failed: %v\n", err)
		continue
	}
}

高级功能

1. 消费者组管理

// 创建消费者组
err := client.CreateConsumerGroup(context.Background(), "my_group", "$")
if err != nil && !strings.Contains(err.Error(), "BUSYGROUP") {
	// 忽略"消费者组已存在"的错误
	panic(err)
}

// 获取消费者组信息
groups, err := client.ConsumerGroups(context.Background())
if err != nil {
	panic(err)
}
fmt.Printf("Consumer groups: %v\n", groups)

2. 消息重试和死信队列

// 消费消息并处理失败时进行重试
messages, err := client.Consume(context.Background(), 10)
if err != nil {
	panic(err)
}

for _, msg := range messages {
	err := processMessage(msg)
	if err != nil {
		// 处理失败,进行重试
		retryCount := getRetryCount(msg.Headers)
		if retryCount < 3 {
			// 更新重试次数并重新入队
			msg.Headers["retry_count"] = retryCount + 1
			_, err := client.Produce(context.Background(), msg)
			if err != nil {
				fmt.Printf("Retry produce failed: %v\n", err)
			}
		} else {
			// 超过重试次数,进入死信队列
			_, err := client.ProduceToDLQ(context.Background(), msg)
			if err != nil {
				fmt.Printf("Produce to DLQ failed: %v\n", err)
			}
		}
	}
	
	// 无论成功失败都确认消息
	_ = client.Ack(context.Background(), msg.ID)
}

3. 批量生产和消费

// 批量生产消息
msgs := []*ami.Message{
	{Body: []byte("msg1")},
	{Body: []byte("msg2")},
	{Body: []byte("msg3")},
}
msgIDs, err := client.BatchProduce(context.Background(), msgs)
if err != nil {
	panic(err)
}
fmt.Printf("Batch produced message IDs: %v\n", msgIDs)

// 批量确认消息
var ids []string
for _, msg := range messages {
	ids = append(ids, msg.ID)
}
err = client.BatchAck(context.Background(), ids)
if err != nil {
	panic(err)
}

最佳实践

  1. 合理设置消费者组:为不同的服务使用不同的消费者组,实现消息的广播

  2. 处理Pending消息:定期检查Pending消息,防止消息堆积

pending, err := client.PendingMessages(context.Background())
if err != nil {
	panic(err)
}
fmt.Printf("Pending messages: %v\n", pending)
  1. 监控队列长度:监控队列长度,及时发现处理能力不足
length, err := client.StreamLength(context.Background())
if err != nil {
	panic(err)
}
fmt.Printf("Stream length: %d\n", length)
  1. 优雅关闭:在服务关闭时确保所有消息处理完成
func gracefulShutdown(client *ami.Client) {
	// 停止接收新消息
	client.StopConsuming()
	
	// 等待处理中的消息完成
	time.Sleep(5 * time.Second)
	
	// 关闭客户端
	client.Close()
}

总结

ami插件为Golang提供了基于Redis Streams的高可靠消息队列解决方案,具有简单易用、高可靠性和高性能的特点。通过合理配置和使用,可以构建出稳定可靠的分布式消息系统。

回到顶部