Golang NATS集成工具类封装

最近在做一个需要集成NATS消息队列的Golang项目,想请教下大家有没有成熟的工具类封装方案?目前自己实现了一个基础版本,但感觉在连接管理、消息重试和错误处理等方面还不够完善。想了解下有没有开源的封装库推荐,或者在实际项目中大家都采用了哪些最佳实践?特别关注连接池管理、消息序列化和异常处理这几个方面。

2 回复

以下是一个简单的Golang NATS工具类封装示例:

package natsutil

import (
	"log"
	"time"

	"github.com/nats-io/nats.go"
)

type NatsClient struct {
	conn *nats.Conn
}

// 初始化NATS连接
func NewNatsClient(url string) (*NatsClient, error) {
	nc, err := nats.Connect(url,
		nats.Timeout(10*time.Second),
		nats.ReconnectWait(5*time.Second),
		nats.MaxReconnects(3),
	)
	if err != nil {
		return nil, err
	}
	
	return &NatsClient{conn: nc}, nil
}

// 发布消息
func (c *NatsClient) Publish(subject string, data []byte) error {
	return c.conn.Publish(subject, data)
}

// 订阅消息
func (c *NatsClient) Subscribe(subject string, handler nats.MsgHandler) (*nats.Subscription, error) {
	return c.conn.Subscribe(subject, handler)
}

// 关闭连接
func (c *NatsClient) Close() {
	c.conn.Close()
}

// 队列订阅
func (c *NatsClient) QueueSubscribe(subject, queue string, handler nats.MsgHandler) (*nats.Subscription, error) {
	return c.conn.QueueSubscribe(subject, queue, handler)
}

使用示例:

client, _ := NewNatsClient("nats://localhost:4222")
defer client.Close()

// 发布
client.Publish("test.subject", []byte("Hello NATS"))

// 订阅
client.Subscribe("test.subject", func(m *nats.Msg) {
	log.Printf("收到消息: %s", string(m.Data))
})

主要特性:

  • 连接管理
  • 基础发布订阅
  • 队列订阅支持
  • 自动重连配置 可根据需要添加JSON序列化、请求回复模式等功能。

更多关于Golang NATS集成工具类封装的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


为Golang NATS消息系统封装一个工具类,可以简化连接管理和消息发布/订阅操作。以下是完整的实现:

package natsutil

import (
	"log"
	"time"

	"github.com/nats-io/nats.go"
)

type NatsClient struct {
	conn *nats.Conn
}

// NewNatsClient 创建NATS客户端
func NewNatsClient(url string, options ...nats.Option) (*NatsClient, error) {
	// 默认选项
	defaultOptions := []nats.Option{
		nats.Name("NATS Client"),
		nats.Timeout(10 * time.Second),
		nats.PingInterval(20 * time.Second),
		nats.MaxPingsOutstanding(5),
		nats.ReconnectWait(2 * time.Second),
		nats.MaxReconnects(60),
	}
	
	// 合并选项
	allOptions := append(defaultOptions, options...)
	
	conn, err := nats.Connect(url, allOptions...)
	if err != nil {
		return nil, err
	}

	client := &NatsClient{conn: conn}
	
	// 设置连接状态监听
	client.setupConnectionHandlers()
	
	return client, nil
}

// setupConnectionHandlers 设置连接状态处理器
func (c *NatsClient) setupConnectionHandlers() {
	c.conn.SetDisconnectHandler(func(nc *nats.Conn) {
		log.Printf("NATS连接断开")
	})
	
	c.conn.SetReconnectHandler(func(nc *nats.Conn) {
		log.Printf("NATS重新连接成功")
	})
	
	c.conn.SetClosedHandler(func(nc *nats.Conn) {
		log.Printf("NATS连接已关闭")
	})
}

// Publish 发布消息
func (c *NatsClient) Publish(subject string, data []byte) error {
	return c.conn.Publish(subject, data)
}

// Subscribe 订阅主题
func (c *NatsClient) Subscribe(subject string, handler nats.MsgHandler) (*nats.Subscription, error) {
	return c.conn.Subscribe(subject, handler)
}

// QueueSubscribe 队列订阅
func (c *NatsClient) QueueSubscribe(subject, queue string, handler nats.MsgHandler) (*nats.Subscription, error) {
	return c.conn.QueueSubscribe(subject, queue, handler)
}

// Request 发送请求并等待响应
func (c *NatsClient) Request(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) {
	return c.conn.Request(subject, data, timeout)
}

// Close 关闭连接
func (c *NatsClient) Close() {
	if c.conn != nil && !c.conn.IsClosed() {
		c.conn.Close()
	}
}

// IsConnected 检查连接状态
func (c *NatsClient) IsConnected() bool {
	return c.conn != nil && c.conn.IsConnected()
}

// Drain 优雅关闭
func (c *NatsClient) Drain() error {
	if c.conn != nil {
		return c.conn.Drain()
	}
	return nil
}

使用示例:

func main() {
	// 创建客户端
	client, err := NewNatsClient("nats://localhost:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// 订阅消息
	subscription, err := client.Subscribe("user.created", func(msg *nats.Msg) {
		log.Printf("收到消息: %s", string(msg.Data))
	})
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Unsubscribe()

	// 发布消息
	err = client.Publish("user.created", []byte("Hello NATS!"))
	if err != nil {
		log.Fatal(err)
	}

	// 等待一段时间
	time.Sleep(1 * time.Second)
}

主要特性:

  • 连接管理:自动重连、连接状态监控
  • 简化API:封装常用操作(发布、订阅、请求)
  • 错误处理:内置连接状态处理器
  • 优雅关闭:支持Drain模式平滑关闭

这个工具类提供了NATS核心功能的简化接口,适合大多数应用场景。

回到顶部