golang实现NATS消息系统高性能通信插件库NATS Go Client的使用
Golang实现NATS消息系统高性能通信插件库NATS Go Client的使用
简介
NATS Go Client是一个用于NATS消息系统的Go语言客户端库。NATS是一个高性能的云原生消息系统,适用于分布式系统、微服务架构和物联网应用。
安装
# 获取最新发布的Go客户端
go get github.com/nats-io/nats.go@latest
# 获取特定版本
go get github.com/nats-io/nats.go@v1.43.0
# 注意NATS Server的最新主版本是v2
go get github.com/nats-io/nats-server/v2@latest
基本用法
import "github.com/nats-io/nats.go"
// 连接到服务器
nc, _ := nats.Connect(nats.DefaultURL)
// 简单发布者
nc.Publish("foo", []byte("Hello World"))
// 简单异步订阅者
nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
})
// 响应请求消息
nc.Subscribe("request", func(m *nats.Msg) {
m.Respond([]byte("answer is 42"))
})
// 简单同步订阅者
sub, err := nc.SubscribeSync("foo")
m, err := sub.NextMsg(timeout)
// 通道订阅者
ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe("foo", ch)
msg := <- ch
// 取消订阅
sub.Unsubscribe()
// 排空
sub.Drain()
// 请求
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)
// 回复
nc.Subscribe("help", func(m *nats.Msg) {
nc.Publish(m.Reply, []byte("I can help!"))
})
// 排空连接(响应者首选)
// 如果调用此方法,则不需要调用Close()
nc.Drain()
// 关闭连接
nc.Close()
JetStream
JetStream是NATS内置的持久化系统。nats.go
提供了内置API,可以管理JetStream资源以及发布/消费持久化消息。
// 连接到nats服务器
nc, _ := nats.Connect(nats.DefaultURL)
// 从nats连接创建jetstream上下文
js, _ := jetstream.New(nc)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 获取现有流句柄
stream, _ := js.Stream(ctx, "foo")
// 从流中检索消费者句柄
cons, _ := stream.Consumer(ctx, "cons")
// 在回调中从消费者消费消息
cc, _ := cons.Consume(func(msg jetstream.Msg) {
fmt.Println("Received jetstream message: ", string(msg.Data()))
msg.Ack()
})
defer cc.Stop()
服务API
服务API(micro
)允许您轻松构建NATS服务。服务API目前处于测试版发布阶段。
新认证(Nkeys和用户凭证)
这需要服务器版本>=2.0.0
nc, err := nats.Connect(url, nats.UserCredentials("user.creds"))
TLS
// tls作为方案将默认启用安全连接。这也会验证服务器名称。
nc, err := nats.Connect("tls://nats.demo.io:4443")
// 如果您使用的是自签名证书,您需要设置带有RootCAs的tls.Config。
// 我们提供了一个辅助方法来简化这种情况。
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))
// 如果服务器需要客户端证书,也有辅助函数:
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
nc, err = nats.Connect("tls://localhost:4443", cert)
// 您也可以提供完整的tls.Config
certFile := "./configs/certs/client-cert.pem"
keyFile := "./configs/certs/client-key.pem"
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
t.Fatalf("error parsing X509 certificate/key pair: %v", err)
}
config := &tls.Config{
ServerName: opts.Host,
Certificates: []tls.Certificate{cert},
RootCAs: pool,
MinVersion: tls.VersionTLS12,
}
nc, err = nats.Connect("nats://localhost:4443", nats.Secure(config))
if err != nil {
t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)
}
通配符订阅
// "*"匹配主题中任何级别的任何令牌。
nc.Subscribe("foo.*.baz", func(m *Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})
nc.Subscribe("foo.bar.*", func(m *Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})
// ">"匹配主题尾部的任何长度,且只能是最后一个令牌
// 例如'foo.>'将匹配'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
nc.Subscribe("foo.>", func(m *Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})
// 匹配以上所有
nc.Publish("foo.bar.baz", []byte("Hello World"))
队列组
// 所有具有相同队列名称的订阅将形成一个队列组。
// 每条消息将仅传递给每个队列组中的一个订阅者,
// 使用队列语义。您可以根据需要创建任意数量的队列组。
// 普通订阅者将继续按预期工作。
nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
received += 1;
})
高级用法
// 通常,当尝试连接且没有服务器运行时,库会返回错误。
// RetryOnFailedConnect选项将在连接失败时立即将连接设置为重新连接状态。
nc, err := nats.Connect(nats.DefaultURL,
nats.RetryOnFailedConnect(true),
nats.MaxReconnects(10),
nats.ReconnectWait(time.Second),
nats.ReconnectHandler(func(_ *nats.Conn) {
// 注意,这将在第一次异步连接时调用。
}))
if err != nil {
// 即使无法连接也不应返回错误,但您仍然
// 需要检查是否有配置错误。
}
// 刷新连接到服务器,在所有消息处理完成后返回。
nc.Flush()
fmt.Println("All clear!")
// FlushTimeout还指定超时值。
err := nc.FlushTimeout(1*time.Second)
if err != nil {
fmt.Println("All clear!")
} else {
fmt.Println("Flushed timed out!")
}
// 在收到MAX_WANTED条消息后自动取消订阅
const MAX_WANTED = 10
sub, err := nc.Subscribe("foo")
sub.AutoUnsubscribe(MAX_WANTED)
// 多个连接
nc1 := nats.Connect("nats://host1:4222")
nc2 := nats.Connect("nats://host2:4222")
nc1.Subscribe("foo", func(m *Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
})
nc2.Publish("foo", []byte("Hello World!"));
集群用法
var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"
nc, err := nats.Connect(servers)
// 可选设置ReconnectWait和MaxReconnect尝试。
// 这个例子意味着每个后端总共10秒。
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))
// 您还可以为重新连接添加一些抖动。
// 此调用将为非TLS连接添加最多500毫秒,为TLS连接添加2秒。
// 如果未指定,库默认为100毫秒和1秒。
nc, err = nats.Connect(servers, nats.ReconnectJitter(500*time.Millisecond, 2*time.Second))
// 您还可以指定自定义重新连接延迟处理程序。如果设置,库将在尝试完
// 其列表中的所有URL时调用它。返回的值将用作总睡眠时间,因此添加您自己的抖动。
// 库将传递它遍历整个列表的次数。
nc, err = nats.Connect(servers, nats.CustomReconnectDelay(func(attempts int) time.Duration {
return someBackoffFunction(attempts)
}))
// 可选禁用服务器池的随机化
nc, err = nats.Connect(servers, nats.DontRandomize())
// 设置回调以在断开连接、重新连接和连接关闭时收到通知。
nc, err = nats.Connect(servers,
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
fmt.Printf("Got disconnected! Reason: %q\n", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
}),
nats.ClosedHandler(func(nc *nats.Conn) {
fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())
})
)
// 当连接到具有自动发现功能的服务器网格时,
// 您可能需要提供用户名/密码或令牌以便在需要身份验证时
// 连接到该网格中的任何服务器。
// 您将使用新的选项设置器而不是在初始URL中提供凭据:
nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))
// 基于令牌的身份验证:
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))
// 您甚至可以同时传递两者,以防网格中的某个服务器需要令牌而不是用户名和密码。
nc, err = nats.Connect("nats://localhost:4222",
nats.UserInfo("foo", "bar"),
nats.Token("S3cretT0ken"))
// 注意,如果初始URL中指定了凭据,则它们优先于通过选项指定的凭据。
// 例如,在下面的连接调用中,客户端库将使用
// 用户"my"和密码"pwd"连接到localhost:4222,但是,
// 当(重新)连接到作为自动发现一部分获得的不同服务器URL时,
// 它将使用用户名"foo"和密码"bar"。
nc, err = nats.Connect("nats://my:pwd@localhost:4222", nats.UserInfo("foo", "bar"))
上下文支持(Go 1.7+)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
nc, err := nats.Connect(nats.DefaultURL)
// 带上下文的请求
msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))
// 带上下文的同步订阅者
sub, err := nc.SubscribeSync("foo")
msg, err := sub.NextMsgWithContext(ctx)
向后兼容性
在nats.go的开发中,我们致力于保持向后兼容性,确保为所有用户提供稳定可靠的体验。通常,我们遵循标准的Go兼容性指南。但是,澄清我们对某些类型变更的立场很重要:
- 扩展结构:向结构体添加新字段不被视为破坏性变更。
- 向导出接口添加方法:在此项目的上下文中,向公共接口添加新方法也不被视为破坏性变更。需要注意的是,不会向接口添加未导出的方法,允许用户实现它们。
此外,此库始终支持至少2个最新的Go次要版本。例如,如果最新的Go版本是1.22,则库将支持Go 1.21和1.22。
许可证
除非另有说明,NATS源文件根据LICENSE文件中找到的Apache Version 2.0许可证分发。
更多关于golang实现NATS消息系统高性能通信插件库NATS Go Client的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现NATS消息系统高性能通信插件库NATS Go Client的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用NATS Go Client实现高性能消息通信
NATS是一个高性能的开源消息系统,Go语言客户端库提供了简单高效的API来实现发布/订阅模式的消息通信。下面我将详细介绍如何使用NATS Go Client构建高性能的消息通信系统。
1. 安装NATS Go客户端
首先需要安装NATS Go客户端库:
go get github.com/nats-io/nats.go
2. 基本连接与消息发布/订阅
2.1 连接到NATS服务器
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// 连接到本地NATS服务器
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
fmt.Println("Connected to NATS at", nc.ConnectedUrl())
// 发布消息
if err := nc.Publish("updates", []byte("Hello NATS!")); err != nil {
log.Fatal(err)
}
// 订阅主题
sub, err := nc.Subscribe("updates", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
})
if err != nil {
log.Fatal(err)
}
defer sub.Unsubscribe()
// 等待消息
time.Sleep(1 * time.Second)
}
2.2 使用异步订阅
// 异步订阅处理消息
sub, err := nc.Subscribe("updates", func(msg *nats.Msg) {
fmt.Printf("Received on [%s]: '%s'\n", msg.Subject, string(msg.Data))
})
if err != nil {
log.Fatal(err)
}
3. 高级特性
3.1 请求-响应模式
// 服务端
nc.Subscribe("help", func(msg *nats.Msg) {
msg.Respond([]byte("I can help!"))
})
// 客户端
response, err := nc.Request("help", []byte("help me"), 2*time.Second)
if err != nil {
log.Fatal(err)
}
fmt.Println("Got response:", string(response.Data))
3.2 队列组
// 多个订阅者使用相同的队列组名称,消息会被均衡分发
nc.QueueSubscribe("updates", "workers", func(msg *nats.Msg) {
fmt.Printf("Worker received: %s\n", string(msg.Data))
})
3.3 持久化订阅
// 创建JetStream上下文
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
// 创建持久化流
_, err = js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.*"},
})
if err != nil {
log.Fatal(err)
}
// 发布持久化消息
_, err = js.Publish("orders.new", []byte("order data"))
if err != nil {
log.Fatal(err)
}
// 创建持久化消费者
sub, err := js.Subscribe("orders.*", func(msg *nats.Msg) {
fmt.Printf("Received JetStream message: %s\n", string(msg.Data))
msg.Ack()
}, nats.Durable("ORDER_PROCESSOR"))
if err != nil {
log.Fatal(err)
}
4. 性能优化技巧
- 连接池:重用连接而不是频繁创建/关闭
- 批量发布:使用
PublishMsg
批量发送消息 - 异步错误处理:设置错误处理回调
// 设置连接选项
opts := []nats.Option{
nats.Name("My NATS Publisher"),
nats.MaxReconnects(10),
nats.ReconnectWait(5 * time.Second),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("Disconnected: %v", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
log.Printf("Reconnected to %v", nc.ConnectedUrl())
}),
nats.ClosedHandler(func(nc *nats.Conn) {
log.Printf("Connection closed: %v", nc.LastError())
}),
}
nc, err := nats.Connect(nats.DefaultURL, opts...)
5. 完整示例:构建高性能消息处理器
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// 连接NATS服务器
nc, err := nats.Connect(nats.DefaultURL,
nats.MaxReconnects(-1), // 无限重连
nats.ReconnectBufSize(5*1024*1024), // 5MB重连缓冲区
)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// 创建JetStream上下文
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
log.Fatal(err)
}
// 创建流
_, err = js.AddStream(&nats.StreamConfig{
Name: "METRICS",
Subjects: []string{"metrics.>"},
Retention: nats.WorkQueuePolicy,
})
if err != nil {
log.Fatal(err)
}
// 启动10个消费者处理消息
for i := 0; i < 10; i++ {
_, err := js.QueueSubscribe("metrics.>", "METRICS_CONSUMERS", func(msg *nats.Msg) {
// 处理消息
fmt.Printf("Processing metric: %s\n", string(msg.Data))
msg.Ack()
}, nats.DeliverAll(), nats.ManualAck())
if err != nil {
log.Fatal(err)
}
}
// 模拟发布者
go func() {
for i := 0; ; i++ {
metric := fmt.Sprintf("metric_%d", i)
if _, err := js.Publish("metrics.cpu", []byte(metric)); err != nil {
log.Printf("Error publishing: %v", err)
}
time.Sleep(100 * time.Millisecond)
}
}()
// 等待退出信号
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
fmt.Println("Shutting down...")
}
总结
NATS Go客户端提供了简单而强大的API来构建高性能的消息系统。通过合理使用连接管理、JetStream持久化、队列组等特性,可以构建出满足各种场景需求的消息通信系统。关键点包括:
- 合理配置连接选项以提高可靠性
- 根据场景选择同步或异步模式
- 使用JetStream实现消息持久化
- 利用队列组实现负载均衡
- 注意错误处理和重连机制
以上代码示例展示了NATS Go客户端的基本和高级用法,你可以根据实际需求进行调整和扩展。