Golang NATS集成工具类封装
最近在做一个需要集成NATS消息队列的Golang项目,想请教下大家有没有成熟的工具类封装方案?目前自己实现了一个基础版本,但感觉在连接管理、消息重试和错误处理等方面还不够完善。想了解下有没有开源的封装库推荐,或者在实际项目中大家都采用了哪些最佳实践?特别关注连接池管理、消息序列化和异常处理这几个方面。
2 回复
以下是一个简单的Golang NATS工具类封装示例:
package natsutil
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
type NatsClient struct {
conn *nats.Conn
}
// 初始化NATS连接
func NewNatsClient(url string) (*NatsClient, error) {
nc, err := nats.Connect(url,
nats.Timeout(10*time.Second),
nats.ReconnectWait(5*time.Second),
nats.MaxReconnects(3),
)
if err != nil {
return nil, err
}
return &NatsClient{conn: nc}, nil
}
// 发布消息
func (c *NatsClient) Publish(subject string, data []byte) error {
return c.conn.Publish(subject, data)
}
// 订阅消息
func (c *NatsClient) Subscribe(subject string, handler nats.MsgHandler) (*nats.Subscription, error) {
return c.conn.Subscribe(subject, handler)
}
// 关闭连接
func (c *NatsClient) Close() {
c.conn.Close()
}
// 队列订阅
func (c *NatsClient) QueueSubscribe(subject, queue string, handler nats.MsgHandler) (*nats.Subscription, error) {
return c.conn.QueueSubscribe(subject, queue, handler)
}
使用示例:
client, _ := NewNatsClient("nats://localhost:4222")
defer client.Close()
// 发布
client.Publish("test.subject", []byte("Hello NATS"))
// 订阅
client.Subscribe("test.subject", func(m *nats.Msg) {
log.Printf("收到消息: %s", string(m.Data))
})
主要特性:
- 连接管理
- 基础发布订阅
- 队列订阅支持
- 自动重连配置 可根据需要添加JSON序列化、请求回复模式等功能。
更多关于Golang NATS集成工具类封装的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
为Golang NATS消息系统封装一个工具类,可以简化连接管理和消息发布/订阅操作。以下是完整的实现:
package natsutil
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
type NatsClient struct {
conn *nats.Conn
}
// NewNatsClient 创建NATS客户端
func NewNatsClient(url string, options ...nats.Option) (*NatsClient, error) {
// 默认选项
defaultOptions := []nats.Option{
nats.Name("NATS Client"),
nats.Timeout(10 * time.Second),
nats.PingInterval(20 * time.Second),
nats.MaxPingsOutstanding(5),
nats.ReconnectWait(2 * time.Second),
nats.MaxReconnects(60),
}
// 合并选项
allOptions := append(defaultOptions, options...)
conn, err := nats.Connect(url, allOptions...)
if err != nil {
return nil, err
}
client := &NatsClient{conn: conn}
// 设置连接状态监听
client.setupConnectionHandlers()
return client, nil
}
// setupConnectionHandlers 设置连接状态处理器
func (c *NatsClient) setupConnectionHandlers() {
c.conn.SetDisconnectHandler(func(nc *nats.Conn) {
log.Printf("NATS连接断开")
})
c.conn.SetReconnectHandler(func(nc *nats.Conn) {
log.Printf("NATS重新连接成功")
})
c.conn.SetClosedHandler(func(nc *nats.Conn) {
log.Printf("NATS连接已关闭")
})
}
// Publish 发布消息
func (c *NatsClient) Publish(subject string, data []byte) error {
return c.conn.Publish(subject, data)
}
// Subscribe 订阅主题
func (c *NatsClient) Subscribe(subject string, handler nats.MsgHandler) (*nats.Subscription, error) {
return c.conn.Subscribe(subject, handler)
}
// QueueSubscribe 队列订阅
func (c *NatsClient) QueueSubscribe(subject, queue string, handler nats.MsgHandler) (*nats.Subscription, error) {
return c.conn.QueueSubscribe(subject, queue, handler)
}
// Request 发送请求并等待响应
func (c *NatsClient) Request(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) {
return c.conn.Request(subject, data, timeout)
}
// Close 关闭连接
func (c *NatsClient) Close() {
if c.conn != nil && !c.conn.IsClosed() {
c.conn.Close()
}
}
// IsConnected 检查连接状态
func (c *NatsClient) IsConnected() bool {
return c.conn != nil && c.conn.IsConnected()
}
// Drain 优雅关闭
func (c *NatsClient) Drain() error {
if c.conn != nil {
return c.conn.Drain()
}
return nil
}
使用示例:
func main() {
// 创建客户端
client, err := NewNatsClient("nats://localhost:4222")
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 订阅消息
subscription, err := client.Subscribe("user.created", func(msg *nats.Msg) {
log.Printf("收到消息: %s", string(msg.Data))
})
if err != nil {
log.Fatal(err)
}
defer subscription.Unsubscribe()
// 发布消息
err = client.Publish("user.created", []byte("Hello NATS!"))
if err != nil {
log.Fatal(err)
}
// 等待一段时间
time.Sleep(1 * time.Second)
}
主要特性:
- 连接管理:自动重连、连接状态监控
- 简化API:封装常用操作(发布、订阅、请求)
- 错误处理:内置连接状态处理器
- 优雅关闭:支持Drain模式平滑关闭
这个工具类提供了NATS核心功能的简化接口,适合大多数应用场景。

