golang高性能NSQ消息队列客户端插件库go-nsq的使用

golang高性能NSQ消息队列客户端插件库go-nsq的使用

go-nsq是NSQ消息队列的官方Go语言客户端库,提供了高性能的消息生产和消费功能。

安装

go get github.com/nsqio/go-nsq

生产者示例

package main

import (
	"github.com/nsqio/go-nsq"
	"log"
)

func main() {
	// 创建生产者配置
	config := nsq.NewConfig()
	
	// 创建生产者实例,连接到nsqd服务
	producer, err := nsq.NewProducer("127.0.0.1:4150", config)
	if err != nil {
		log.Fatal(err)
	}
	
	// 发布消息到"test"主题
	err = producer.Publish("test", []byte("hello nsq"))
	if err != nil {
		log.Fatal("publish error: " + err.Error())
	}
	
	// 优雅停止生产者
	producer.Stop()
}

消费者示例

package main

import (
	"github.com/nsqio/go-nsq"
	"log"
)

// MyHandler 实现消息处理接口
type MyHandler struct{}

// HandleMessage 处理接收到的消息
func (h *MyHandler) HandleMessage(m *nsq.Message) error {
	if len(m.Body) == 0 {
		// 返回nil会自动发送FIN命令,表示消息处理成功
		return nil
	}
	
	// 处理消息
	log.Printf("收到消息: %s", string(m.Body))
	
	// 返回nil表示处理成功
	return nil
}

func main() {
	// 创建消费者配置
	config := nsq.NewConfig()
	
	// 创建消费者实例
	consumer, err := nsq.NewConsumer("test", "channel1", config)
	if err != nil {
		log.Fatal(err)
	}
	
	// 设置消息处理器
	consumer.AddHandler(&MyHandler{})
	
	// 连接到nsqd服务
	err = consumer.ConnectToNSQD("127.0.0.1:4150")
	if err != nil {
		log.Fatal(err)
	}
	
	// 阻塞主线程
	select {}
}

高级配置

// 创建自定义配置
config := nsq.NewConfig()
config.MaxInFlight = 10 // 设置最大并发处理消息数
config.MaxAttempts = 5  // 设置消息最大重试次数

// 设置心跳间隔和超时
config.HeartbeatInterval = 30 * time.Second
config.ReadTimeout = 60 * time.Second

使用Lookupd发现服务

// 消费者可以通过nsqlookupd发现服务
err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
if err != nil {
    log.Fatal(err)
}

go-nsq提供了丰富的功能和配置选项,可以满足各种消息队列场景的需求。


更多关于golang高性能NSQ消息队列客户端插件库go-nsq的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高性能NSQ消息队列客户端插件库go-nsq的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


go-nsq: 高性能NSQ消息队列客户端库使用指南

概述

go-nsq是NSQ官方提供的Golang客户端库,用于与NSQ消息队列系统进行交互。它提供了高性能的生产者和消费者实现,支持NSQ的所有核心功能。

安装

go get github.com/nsqio/go-nsq

生产者(Producer)使用

基本生产者示例

package main

import (
	"log"
	"github.com/nsqio/go-nsq"
)

func main() {
	// 创建生产者配置
	config := nsq.NewConfig()
	
	// 创建生产者实例
	producer, err := nsq.NewProducer("127.0.0.1:4150", config)
	if err != nil {
		log.Fatal(err)
	}
	
	// 发布消息
	topicName := "test_topic"
	messageBody := []byte("hello nsq")
	err = producer.Publish(topicName, messageBody)
	if err != nil {
		log.Fatal("publish error: " + err.Error())
	}
	
	// 优雅停止
	producer.Stop()
}

批量发布消息

// 批量发布多条消息
messages := [][]byte{
	[]byte("message 1"),
	[]byte("message 2"),
	[]byte("message 3"),
}

err = producer.MultiPublish(topicName, messages)
if err != nil {
	log.Fatal("multi publish error: " + err.Error())
}

延迟消息

// 发布延迟消息(单位:毫秒)
delay := 5000 // 5秒后投递
err = producer.DeferredPublish(topicName, delay, messageBody)

消费者(Consumer)使用

基本消费者示例

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"github.com/nsqio/go-nsq"
)

// 消息处理handler
type myMessageHandler struct{}

// 实现HandleMessage方法
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
	if len(m.Body) == 0 {
		// 返回nil会自动将消息标记为已完成
		return nil
	}
	
	// 处理消息
	fmt.Printf("Received message: %s\n", string(m.Body))
	
	// 返回nil表示成功处理,消息会被标记为已完成
	return nil
}

func main() {
	// 创建消费者配置
	config := nsq.NewConfig()
	
	// 创建消费者实例
	topicName := "test_topic"
	channelName := "test_channel"
	consumer, err := nsq.NewConsumer(topicName, channelName, config)
	if err != nil {
		log.Fatal(err)
	}
	
	// 设置消息处理器
	consumer.AddHandler(&myMessageHandler{})
	
	// 连接到NSQD
	err = consumer.ConnectToNSQD("127.0.0.1:4150")
	if err != nil {
		log.Fatal(err)
	}
	
	// 处理中断信号
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	<-sigChan
	
	// 优雅停止
	consumer.Stop()
}

高级消费者配置

config := nsq.NewConfig()
config.MaxInFlight = 100        // 最大处理中的消息数
config.MaxAttempts = 5          // 最大重试次数
config.HeartbeatInterval = 30 * time.Second // 心跳间隔
config.RDYRedistributeInterval = 5 * time.Second // RDY重新分配间隔

// 设置消息处理并发数
consumer.SetConcurrency(10)

连接NSQLookupd

// 连接多个NSQLookupd实例
err = consumer.ConnectToNSQLookupds([]string{
	"127.0.0.1:4161",
	"127.0.0.1:4163",
})

高级特性

TLS配置

config := nsq.NewConfig()
config.TlsV1 = true
config.TlsConfig = &tls.Config{
	InsecureSkipVerify: true, // 仅用于测试环境
}

压缩

config := nsq.NewConfig()
config.Deflate = true
config.DeflateLevel = 6 // 压缩级别1-9

消息处理统计

stats := consumer.Stats()
fmt.Printf("Messages received: %d\n", stats.MessagesReceived)
fmt.Printf("Messages finished: %d\n", stats.MessagesFinished)
fmt.Printf("Messages requeued: %d\n", stats.MessagesRequeued)

最佳实践

  1. 生产者:

    • 复用Producer实例,避免频繁创建销毁
    • 合理设置批处理大小
    • 处理Publish错误并实现重试逻辑
  2. 消费者:

    • 根据处理能力设置MaxInFlight
    • 处理消息时考虑幂等性
    • 实现优雅停止,确保处理中的消息完成
  3. 错误处理:

    • 监控错误通道
    • 实现重试和死信队列机制

go-nsq提供了高性能的NSQ客户端实现,通过合理配置可以满足大多数消息队列场景的需求。在生产环境中,建议结合监控和日志系统,确保消息系统的可靠性和可观测性。

回到顶部