golang高性能NSQ消息队列客户端插件库go-nsq的使用
golang高性能NSQ消息队列客户端插件库go-nsq的使用
go-nsq是NSQ消息队列的官方Go语言客户端库,提供了高性能的消息生产和消费功能。
安装
go get github.com/nsqio/go-nsq
生产者示例
package main
import (
"github.com/nsqio/go-nsq"
"log"
)
func main() {
// 创建生产者配置
config := nsq.NewConfig()
// 创建生产者实例,连接到nsqd服务
producer, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Fatal(err)
}
// 发布消息到"test"主题
err = producer.Publish("test", []byte("hello nsq"))
if err != nil {
log.Fatal("publish error: " + err.Error())
}
// 优雅停止生产者
producer.Stop()
}
消费者示例
package main
import (
"github.com/nsqio/go-nsq"
"log"
)
// MyHandler 实现消息处理接口
type MyHandler struct{}
// HandleMessage 处理接收到的消息
func (h *MyHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// 返回nil会自动发送FIN命令,表示消息处理成功
return nil
}
// 处理消息
log.Printf("收到消息: %s", string(m.Body))
// 返回nil表示处理成功
return nil
}
func main() {
// 创建消费者配置
config := nsq.NewConfig()
// 创建消费者实例
consumer, err := nsq.NewConsumer("test", "channel1", config)
if err != nil {
log.Fatal(err)
}
// 设置消息处理器
consumer.AddHandler(&MyHandler{})
// 连接到nsqd服务
err = consumer.ConnectToNSQD("127.0.0.1:4150")
if err != nil {
log.Fatal(err)
}
// 阻塞主线程
select {}
}
高级配置
// 创建自定义配置
config := nsq.NewConfig()
config.MaxInFlight = 10 // 设置最大并发处理消息数
config.MaxAttempts = 5 // 设置消息最大重试次数
// 设置心跳间隔和超时
config.HeartbeatInterval = 30 * time.Second
config.ReadTimeout = 60 * time.Second
使用Lookupd发现服务
// 消费者可以通过nsqlookupd发现服务
err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
if err != nil {
log.Fatal(err)
}
go-nsq提供了丰富的功能和配置选项,可以满足各种消息队列场景的需求。
更多关于golang高性能NSQ消息队列客户端插件库go-nsq的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang高性能NSQ消息队列客户端插件库go-nsq的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
go-nsq: 高性能NSQ消息队列客户端库使用指南
概述
go-nsq是NSQ官方提供的Golang客户端库,用于与NSQ消息队列系统进行交互。它提供了高性能的生产者和消费者实现,支持NSQ的所有核心功能。
安装
go get github.com/nsqio/go-nsq
生产者(Producer)使用
基本生产者示例
package main
import (
"log"
"github.com/nsqio/go-nsq"
)
func main() {
// 创建生产者配置
config := nsq.NewConfig()
// 创建生产者实例
producer, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Fatal(err)
}
// 发布消息
topicName := "test_topic"
messageBody := []byte("hello nsq")
err = producer.Publish(topicName, messageBody)
if err != nil {
log.Fatal("publish error: " + err.Error())
}
// 优雅停止
producer.Stop()
}
批量发布消息
// 批量发布多条消息
messages := [][]byte{
[]byte("message 1"),
[]byte("message 2"),
[]byte("message 3"),
}
err = producer.MultiPublish(topicName, messages)
if err != nil {
log.Fatal("multi publish error: " + err.Error())
}
延迟消息
// 发布延迟消息(单位:毫秒)
delay := 5000 // 5秒后投递
err = producer.DeferredPublish(topicName, delay, messageBody)
消费者(Consumer)使用
基本消费者示例
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/nsqio/go-nsq"
)
// 消息处理handler
type myMessageHandler struct{}
// 实现HandleMessage方法
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// 返回nil会自动将消息标记为已完成
return nil
}
// 处理消息
fmt.Printf("Received message: %s\n", string(m.Body))
// 返回nil表示成功处理,消息会被标记为已完成
return nil
}
func main() {
// 创建消费者配置
config := nsq.NewConfig()
// 创建消费者实例
topicName := "test_topic"
channelName := "test_channel"
consumer, err := nsq.NewConsumer(topicName, channelName, config)
if err != nil {
log.Fatal(err)
}
// 设置消息处理器
consumer.AddHandler(&myMessageHandler{})
// 连接到NSQD
err = consumer.ConnectToNSQD("127.0.0.1:4150")
if err != nil {
log.Fatal(err)
}
// 处理中断信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
// 优雅停止
consumer.Stop()
}
高级消费者配置
config := nsq.NewConfig()
config.MaxInFlight = 100 // 最大处理中的消息数
config.MaxAttempts = 5 // 最大重试次数
config.HeartbeatInterval = 30 * time.Second // 心跳间隔
config.RDYRedistributeInterval = 5 * time.Second // RDY重新分配间隔
// 设置消息处理并发数
consumer.SetConcurrency(10)
连接NSQLookupd
// 连接多个NSQLookupd实例
err = consumer.ConnectToNSQLookupds([]string{
"127.0.0.1:4161",
"127.0.0.1:4163",
})
高级特性
TLS配置
config := nsq.NewConfig()
config.TlsV1 = true
config.TlsConfig = &tls.Config{
InsecureSkipVerify: true, // 仅用于测试环境
}
压缩
config := nsq.NewConfig()
config.Deflate = true
config.DeflateLevel = 6 // 压缩级别1-9
消息处理统计
stats := consumer.Stats()
fmt.Printf("Messages received: %d\n", stats.MessagesReceived)
fmt.Printf("Messages finished: %d\n", stats.MessagesFinished)
fmt.Printf("Messages requeued: %d\n", stats.MessagesRequeued)
最佳实践
-
生产者:
- 复用Producer实例,避免频繁创建销毁
- 合理设置批处理大小
- 处理Publish错误并实现重试逻辑
-
消费者:
- 根据处理能力设置MaxInFlight
- 处理消息时考虑幂等性
- 实现优雅停止,确保处理中的消息完成
-
错误处理:
- 监控错误通道
- 实现重试和死信队列机制
go-nsq提供了高性能的NSQ客户端实现,通过合理配置可以满足大多数消息队列场景的需求。在生产环境中,建议结合监控和日志系统,确保消息系统的可靠性和可观测性。