golang基于Redis集群流的高可靠队列客户端插件ami的使用
Golang基于Redis集群流的高可靠队列客户端插件Ami的使用
Ami是一个基于Redis Cluster Streams的高可靠队列客户端插件,专为Golang设计。
性能表现
性能取决于以下因素:
- Redis集群节点数量
- 客户端到Redis集群主节点的ping延迟
- 节点间的网络速度
- 消息大小
- Ami配置
示例性能数据(10节点Redis集群,部分节点在另一个数据中心,50ms ping延迟,1主1从,消息为"{}"):
$ go run examples/performance/main.go
Produced 1000000 in 3.423883 sec, rps 292066.022156
Consumed 151000 in 1.049238 sec, rps 143913.931722
Acked 151000 in 0.973587 sec, rps 155096.612263
生产者示例
package main
import (
"time"
"github.com/kak-tus/ami"
"github.com/go-redis/redis/v8"
)
// 错误日志处理器
type errorLogger struct{}
func (l *errorLogger) AmiError(err error) {
println("Got error from Ami:", err.Error())
}
func main() {
// 创建生产者
pr, err := ami.NewProducer(
ami.ProducerOptions{
ErrorNotifier: &errorLogger{}, // 错误通知器
Name: "ruthie", // 队列名称
PendingBufferSize: 10000000, // 待处理缓冲区大小
PipeBufferSize: 50000, // 管道缓冲区大小
PipePeriod: time.Microsecond * 1000, // 管道周期
ShardsCount: 10, // 分片数量
},
&redis.ClusterOptions{
Addrs: []string{"172.17.0.1:7001", "172.17.0.1:7002"}, // Redis集群地址
ReadTimeout: time.Second * 60, // 读取超时
WriteTimeout: time.Second * 60, // 写入超时
},
)
if err != nil {
panic(err)
}
// 发送10000条消息
for i := 0; i < 10000; i++ {
pr.Send("{}")
}
// 关闭生产者
pr.Close()
}
消费者示例
package main
import (
"sync"
"time"
"github.com/kak-tus/ami"
"github.com/go-redis/redis/v8"
)
// 错误日志处理器
type errorLogger struct{}
func (l *errorLogger) AmiError(err error) {
println("Got error from Ami:", err.Error())
}
func main() {
// 创建消费者
cn, err := ami.NewConsumer(
ami.ConsumerOptions{
Consumer: "alice", // 消费者名称
ErrorNotifier: &errorLogger{}, // 错误通知器
Name: "ruthie", // 队列名称
PendingBufferSize: 10000000, // 待处理缓冲区大小
PipeBufferSize: 50000, // 管道缓冲区大小
PipePeriod: time.Microsecond * 1000, // 管道周期
PrefetchCount: 100, // 预取数量
ShardsCount: 10, // 分片数量
},
&redis.ClusterOptions{
Addrs: []string{"172.17.0.1:7001", "172.17.0.1:7002"}, // Redis集群地址
ReadTimeout: time.Second * 60, // 读取超时
WriteTimeout: time.Second * 60, // 写入超时
},
)
if err != nil {
panic(err)
}
// 启动消费者
c := cn.Start()
// 使用WaitGroup等待处理完成
wg := sync.WaitGroup{}
wg.Add(1)
// 启动goroutine处理消息
go func() {
for {
m, more := <-c
if !more {
break
}
println("Got", m.Body, "ID", m.ID)
cn.Ack(m) // 确认消息
}
wg.Done()
}()
// 等待1秒
time.Sleep(time.Second)
// 停止消费者
cn.Stop()
wg.Wait()
// 关闭消费者
cn.Close()
}
以上示例展示了如何使用Ami库创建生产者和消费者,与Redis集群交互实现高可靠的消息队列功能。
更多关于golang基于Redis集群流的高可靠队列客户端插件ami的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang基于Redis集群流的高可靠队列客户端插件ami的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang基于Redis集群流的高可靠队列客户端插件ami使用指南
什么是ami
ami是一个基于Redis Streams的高可靠消息队列客户端插件,专为Golang设计。它提供了消息的生产、消费、确认等完整功能,特别适合构建分布式系统中的可靠消息队列。
主要特性
- 基于Redis Streams实现,支持Redis集群
- 消息持久化和可靠性保证
- 消费者组支持,实现负载均衡
- 消息重试和死信队列机制
- 简洁易用的API接口
安装
go get github.com/ami-team/ami
基本使用示例
1. 初始化客户端
package main
import (
"context"
"fmt"
"time"
"github.com/ami-team/ami"
"github.com/go-redis/redis/v8"
)
func main() {
// 创建Redis客户端
redisClient := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{"127.0.0.1:7000", "127.0.0.1:7001", "127.0.0.1:7002"},
})
// 创建ami客户端
client, err := ami.NewClient(
ami.WithRedisClient(redisClient),
ami.WithStreamName("my_stream"),
ami.WithConsumerGroup("my_group"),
ami.WithConsumerName("consumer1"),
)
if err != nil {
panic(err)
}
defer client.Close()
// 使用示例...
}
2. 生产消息
// 生产消息
msgID, err := client.Produce(context.Background(), &ami.Message{
Body: []byte("hello world"),
Headers: map[string]interface{}{
"key1": "value1",
"key2": 123,
},
})
if err != nil {
fmt.Printf("Produce message failed: %v\n", err)
return
}
fmt.Printf("Message produced with ID: %s\n", msgID)
3. 消费消息
// 消费消息
messages, err := client.Consume(context.Background(), 10) // 一次最多获取10条消息
if err != nil {
fmt.Printf("Consume message failed: %v\n", err)
return
}
for _, msg := range messages {
fmt.Printf("Received message: ID=%s, Body=%s, Headers=%v\n",
msg.ID, string(msg.Body), msg.Headers)
// 处理消息...
// 确认消息处理完成
if err := client.Ack(context.Background(), msg.ID); err != nil {
fmt.Printf("Ack message failed: %v\n", err)
continue
}
}
高级功能
1. 消费者组管理
// 创建消费者组
err := client.CreateConsumerGroup(context.Background(), "my_group", "$")
if err != nil && !strings.Contains(err.Error(), "BUSYGROUP") {
// 忽略"消费者组已存在"的错误
panic(err)
}
// 获取消费者组信息
groups, err := client.ConsumerGroups(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Consumer groups: %v\n", groups)
2. 消息重试和死信队列
// 消费消息并处理失败时进行重试
messages, err := client.Consume(context.Background(), 10)
if err != nil {
panic(err)
}
for _, msg := range messages {
err := processMessage(msg)
if err != nil {
// 处理失败,进行重试
retryCount := getRetryCount(msg.Headers)
if retryCount < 3 {
// 更新重试次数并重新入队
msg.Headers["retry_count"] = retryCount + 1
_, err := client.Produce(context.Background(), msg)
if err != nil {
fmt.Printf("Retry produce failed: %v\n", err)
}
} else {
// 超过重试次数,进入死信队列
_, err := client.ProduceToDLQ(context.Background(), msg)
if err != nil {
fmt.Printf("Produce to DLQ failed: %v\n", err)
}
}
}
// 无论成功失败都确认消息
_ = client.Ack(context.Background(), msg.ID)
}
3. 批量生产和消费
// 批量生产消息
msgs := []*ami.Message{
{Body: []byte("msg1")},
{Body: []byte("msg2")},
{Body: []byte("msg3")},
}
msgIDs, err := client.BatchProduce(context.Background(), msgs)
if err != nil {
panic(err)
}
fmt.Printf("Batch produced message IDs: %v\n", msgIDs)
// 批量确认消息
var ids []string
for _, msg := range messages {
ids = append(ids, msg.ID)
}
err = client.BatchAck(context.Background(), ids)
if err != nil {
panic(err)
}
最佳实践
-
合理设置消费者组:为不同的服务使用不同的消费者组,实现消息的广播
-
处理Pending消息:定期检查Pending消息,防止消息堆积
pending, err := client.PendingMessages(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Pending messages: %v\n", pending)
- 监控队列长度:监控队列长度,及时发现处理能力不足
length, err := client.StreamLength(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Stream length: %d\n", length)
- 优雅关闭:在服务关闭时确保所有消息处理完成
func gracefulShutdown(client *ami.Client) {
// 停止接收新消息
client.StopConsuming()
// 等待处理中的消息完成
time.Sleep(5 * time.Second)
// 关闭客户端
client.Close()
}
总结
ami插件为Golang提供了基于Redis Streams的高可靠消息队列解决方案,具有简单易用、高可靠性和高性能的特点。通过合理配置和使用,可以构建出稳定可靠的分布式消息系统。