golang实现NATS消息系统高性能通信插件库NATS Go Client的使用

Golang实现NATS消息系统高性能通信插件库NATS Go Client的使用

简介

NATS Go Client是一个用于NATS消息系统的Go语言客户端库。NATS是一个高性能的云原生消息系统,适用于分布式系统、微服务架构和物联网应用。

安装

# 获取最新发布的Go客户端
go get github.com/nats-io/nats.go@latest

# 获取特定版本
go get github.com/nats-io/nats.go@v1.43.0

# 注意NATS Server的最新主版本是v2
go get github.com/nats-io/nats-server/v2@latest

基本用法

import "github.com/nats-io/nats.go"

// 连接到服务器
nc, _ := nats.Connect(nats.DefaultURL)

// 简单发布者
nc.Publish("foo", []byte("Hello World"))

// 简单异步订阅者
nc.Subscribe("foo", func(m *nats.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

// 响应请求消息
nc.Subscribe("request", func(m *nats.Msg) {
    m.Respond([]byte("answer is 42"))
})

// 简单同步订阅者
sub, err := nc.SubscribeSync("foo")
m, err := sub.NextMsg(timeout)

// 通道订阅者
ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe("foo", ch)
msg := <- ch

// 取消订阅
sub.Unsubscribe()

// 排空
sub.Drain()

// 请求
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)

// 回复
nc.Subscribe("help", func(m *nats.Msg) {
    nc.Publish(m.Reply, []byte("I can help!"))
})

// 排空连接(响应者首选)
// 如果调用此方法,则不需要调用Close()
nc.Drain()

// 关闭连接
nc.Close()

JetStream

JetStream是NATS内置的持久化系统。nats.go提供了内置API,可以管理JetStream资源以及发布/消费持久化消息。

// 连接到nats服务器
nc, _ := nats.Connect(nats.DefaultURL)

// 从nats连接创建jetstream上下文
js, _ := jetstream.New(nc)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// 获取现有流句柄
stream, _ := js.Stream(ctx, "foo")

// 从流中检索消费者句柄
cons, _ := stream.Consumer(ctx, "cons")

// 在回调中从消费者消费消息
cc, _ := cons.Consume(func(msg jetstream.Msg) {
    fmt.Println("Received jetstream message: ", string(msg.Data()))
    msg.Ack()
})
defer cc.Stop()

服务API

服务API(micro)允许您轻松构建NATS服务。服务API目前处于测试版发布阶段。

新认证(Nkeys和用户凭证)

这需要服务器版本>=2.0.0

nc, err := nats.Connect(url, nats.UserCredentials("user.creds"))

TLS

// tls作为方案将默认启用安全连接。这也会验证服务器名称。
nc, err := nats.Connect("tls://nats.demo.io:4443")

// 如果您使用的是自签名证书,您需要设置带有RootCAs的tls.Config。
// 我们提供了一个辅助方法来简化这种情况。
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))

// 如果服务器需要客户端证书,也有辅助函数:
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
nc, err = nats.Connect("tls://localhost:4443", cert)

// 您也可以提供完整的tls.Config
certFile := "./configs/certs/client-cert.pem"
keyFile := "./configs/certs/client-key.pem"
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
    t.Fatalf("error parsing X509 certificate/key pair: %v", err)
}

config := &tls.Config{
    ServerName: 	opts.Host,
    Certificates: 	[]tls.Certificate{cert},
    RootCAs:    	pool,
    MinVersion: 	tls.VersionTLS12,
}

nc, err = nats.Connect("nats://localhost:4443", nats.Secure(config))
if err != nil {
	t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)
}

通配符订阅

// "*"匹配主题中任何级别的任何令牌。
nc.Subscribe("foo.*.baz", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

nc.Subscribe("foo.bar.*", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// ">"匹配主题尾部的任何长度,且只能是最后一个令牌
// 例如'foo.>'将匹配'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
nc.Subscribe("foo.>", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// 匹配以上所有
nc.Publish("foo.bar.baz", []byte("Hello World"))

队列组

// 所有具有相同队列名称的订阅将形成一个队列组。
// 每条消息将仅传递给每个队列组中的一个订阅者,
// 使用队列语义。您可以根据需要创建任意数量的队列组。
// 普通订阅者将继续按预期工作。

nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
  received += 1;
})

高级用法

// 通常,当尝试连接且没有服务器运行时,库会返回错误。
// RetryOnFailedConnect选项将在连接失败时立即将连接设置为重新连接状态。
nc, err := nats.Connect(nats.DefaultURL,
    nats.RetryOnFailedConnect(true),
    nats.MaxReconnects(10),
    nats.ReconnectWait(time.Second),
    nats.ReconnectHandler(func(_ *nats.Conn) {
        // 注意,这将在第一次异步连接时调用。
    }))
if err != nil {
    // 即使无法连接也不应返回错误,但您仍然
    // 需要检查是否有配置错误。
}

// 刷新连接到服务器,在所有消息处理完成后返回。
nc.Flush()
fmt.Println("All clear!")

// FlushTimeout还指定超时值。
err := nc.FlushTimeout(1*time.Second)
if err != nil {
    fmt.Println("All clear!")
} else {
    fmt.Println("Flushed timed out!")
}

// 在收到MAX_WANTED条消息后自动取消订阅
const MAX_WANTED = 10
sub, err := nc.Subscribe("foo")
sub.AutoUnsubscribe(MAX_WANTED)

// 多个连接
nc1 := nats.Connect("nats://host1:4222")
nc2 := nats.Connect("nats://host2:4222")

nc1.Subscribe("foo", func(m *Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

nc2.Publish("foo", []byte("Hello World!"));

集群用法

var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"

nc, err := nats.Connect(servers)

// 可选设置ReconnectWait和MaxReconnect尝试。
// 这个例子意味着每个后端总共10秒。
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))

// 您还可以为重新连接添加一些抖动。
// 此调用将为非TLS连接添加最多500毫秒,为TLS连接添加2秒。
// 如果未指定,库默认为100毫秒和1秒。
nc, err = nats.Connect(servers, nats.ReconnectJitter(500*time.Millisecond, 2*time.Second))

// 您还可以指定自定义重新连接延迟处理程序。如果设置,库将在尝试完
// 其列表中的所有URL时调用它。返回的值将用作总睡眠时间,因此添加您自己的抖动。
// 库将传递它遍历整个列表的次数。
nc, err = nats.Connect(servers, nats.CustomReconnectDelay(func(attempts int) time.Duration {
    return someBackoffFunction(attempts)
}))

// 可选禁用服务器池的随机化
nc, err = nats.Connect(servers, nats.DontRandomize())

// 设置回调以在断开连接、重新连接和连接关闭时收到通知。
nc, err = nats.Connect(servers,
	nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
		fmt.Printf("Got disconnected! Reason: %q\n", err)
	}),
	nats.ReconnectHandler(func(nc *nats.Conn) {
		fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
	}),
	nats.ClosedHandler(func(nc *nats.Conn) {
		fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())
	})
)

// 当连接到具有自动发现功能的服务器网格时,
// 您可能需要提供用户名/密码或令牌以便在需要身份验证时
// 连接到该网格中的任何服务器。
// 您将使用新的选项设置器而不是在初始URL中提供凭据:
nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))

// 基于令牌的身份验证:
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))

// 您甚至可以同时传递两者,以防网格中的某个服务器需要令牌而不是用户名和密码。
nc, err = nats.Connect("nats://localhost:4222",
    nats.UserInfo("foo", "bar"),
    nats.Token("S3cretT0ken"))

// 注意,如果初始URL中指定了凭据,则它们优先于通过选项指定的凭据。
// 例如,在下面的连接调用中,客户端库将使用
// 用户"my"和密码"pwd"连接到localhost:4222,但是,
// 当(重新)连接到作为自动发现一部分获得的不同服务器URL时,
// 它将使用用户名"foo"和密码"bar"。
nc, err = nats.Connect("nats://my:pwd@localhost:4222", nats.UserInfo("foo", "bar"))

上下文支持(Go 1.7+)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

nc, err := nats.Connect(nats.DefaultURL)

// 带上下文的请求
msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))

// 带上下文的同步订阅者
sub, err := nc.SubscribeSync("foo")
msg, err := sub.NextMsgWithContext(ctx)

向后兼容性

在nats.go的开发中,我们致力于保持向后兼容性,确保为所有用户提供稳定可靠的体验。通常,我们遵循标准的Go兼容性指南。但是,澄清我们对某些类型变更的立场很重要:

  • 扩展结构:向结构体添加新字段不被视为破坏性变更。
  • 向导出接口添加方法:在此项目的上下文中,向公共接口添加新方法也不被视为破坏性变更。需要注意的是,不会向接口添加未导出的方法,允许用户实现它们。

此外,此库始终支持至少2个最新的Go次要版本。例如,如果最新的Go版本是1.22,则库将支持Go 1.21和1.22。

许可证

除非另有说明,NATS源文件根据LICENSE文件中找到的Apache Version 2.0许可证分发。


更多关于golang实现NATS消息系统高性能通信插件库NATS Go Client的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现NATS消息系统高性能通信插件库NATS Go Client的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用NATS Go Client实现高性能消息通信

NATS是一个高性能的开源消息系统,Go语言客户端库提供了简单高效的API来实现发布/订阅模式的消息通信。下面我将详细介绍如何使用NATS Go Client构建高性能的消息通信系统。

1. 安装NATS Go客户端

首先需要安装NATS Go客户端库:

go get github.com/nats-io/nats.go

2. 基本连接与消息发布/订阅

2.1 连接到NATS服务器

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/nats-io/nats.go"
)

func main() {
	// 连接到本地NATS服务器
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	fmt.Println("Connected to NATS at", nc.ConnectedUrl())

	// 发布消息
	if err := nc.Publish("updates", []byte("Hello NATS!")); err != nil {
		log.Fatal(err)
	}

	// 订阅主题
	sub, err := nc.Subscribe("updates", func(msg *nats.Msg) {
		fmt.Printf("Received a message: %s\n", string(msg.Data))
	})
	if err != nil {
		log.Fatal(err)
	}
	defer sub.Unsubscribe()

	// 等待消息
	time.Sleep(1 * time.Second)
}

2.2 使用异步订阅

// 异步订阅处理消息
sub, err := nc.Subscribe("updates", func(msg *nats.Msg) {
    fmt.Printf("Received on [%s]: '%s'\n", msg.Subject, string(msg.Data))
})
if err != nil {
    log.Fatal(err)
}

3. 高级特性

3.1 请求-响应模式

// 服务端
nc.Subscribe("help", func(msg *nats.Msg) {
    msg.Respond([]byte("I can help!"))
})

// 客户端
response, err := nc.Request("help", []byte("help me"), 2*time.Second)
if err != nil {
    log.Fatal(err)
}
fmt.Println("Got response:", string(response.Data))

3.2 队列组

// 多个订阅者使用相同的队列组名称,消息会被均衡分发
nc.QueueSubscribe("updates", "workers", func(msg *nats.Msg) {
    fmt.Printf("Worker received: %s\n", string(msg.Data))
})

3.3 持久化订阅

// 创建JetStream上下文
js, err := nc.JetStream()
if err != nil {
    log.Fatal(err)
}

// 创建持久化流
_, err = js.AddStream(&nats.StreamConfig{
    Name:     "ORDERS",
    Subjects: []string{"orders.*"},
})
if err != nil {
    log.Fatal(err)
}

// 发布持久化消息
_, err = js.Publish("orders.new", []byte("order data"))
if err != nil {
    log.Fatal(err)
}

// 创建持久化消费者
sub, err := js.Subscribe("orders.*", func(msg *nats.Msg) {
    fmt.Printf("Received JetStream message: %s\n", string(msg.Data))
    msg.Ack()
}, nats.Durable("ORDER_PROCESSOR"))
if err != nil {
    log.Fatal(err)
}

4. 性能优化技巧

  1. 连接池:重用连接而不是频繁创建/关闭
  2. 批量发布:使用PublishMsg批量发送消息
  3. 异步错误处理:设置错误处理回调
// 设置连接选项
opts := []nats.Option{
    nats.Name("My NATS Publisher"),
    nats.MaxReconnects(10),
    nats.ReconnectWait(5 * time.Second),
    nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
        log.Printf("Disconnected: %v", err)
    }),
    nats.ReconnectHandler(func(nc *nats.Conn) {
        log.Printf("Reconnected to %v", nc.ConnectedUrl())
    }),
    nats.ClosedHandler(func(nc *nats.Conn) {
        log.Printf("Connection closed: %v", nc.LastError())
    }),
}

nc, err := nats.Connect(nats.DefaultURL, opts...)

5. 完整示例:构建高性能消息处理器

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/nats-io/nats.go"
)

func main() {
	// 连接NATS服务器
	nc, err := nats.Connect(nats.DefaultURL,
		nats.MaxReconnects(-1), // 无限重连
		nats.ReconnectBufSize(5*1024*1024), // 5MB重连缓冲区
	)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	// 创建JetStream上下文
	js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
	if err != nil {
		log.Fatal(err)
	}

	// 创建流
	_, err = js.AddStream(&nats.StreamConfig{
		Name:     "METRICS",
		Subjects: []string{"metrics.>"},
		Retention: nats.WorkQueuePolicy,
	})
	if err != nil {
		log.Fatal(err)
	}

	// 启动10个消费者处理消息
	for i := 0; i < 10; i++ {
		_, err := js.QueueSubscribe("metrics.>", "METRICS_CONSUMERS", func(msg *nats.Msg) {
			// 处理消息
			fmt.Printf("Processing metric: %s\n", string(msg.Data))
			msg.Ack()
		}, nats.DeliverAll(), nats.ManualAck())
		if err != nil {
			log.Fatal(err)
		}
	}

	// 模拟发布者
	go func() {
		for i := 0; ; i++ {
			metric := fmt.Sprintf("metric_%d", i)
			if _, err := js.Publish("metrics.cpu", []byte(metric)); err != nil {
				log.Printf("Error publishing: %v", err)
			}
			time.Sleep(100 * time.Millisecond)
		}
	}()

	// 等待退出信号
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	<-sig
	fmt.Println("Shutting down...")
}

总结

NATS Go客户端提供了简单而强大的API来构建高性能的消息系统。通过合理使用连接管理、JetStream持久化、队列组等特性,可以构建出满足各种场景需求的消息通信系统。关键点包括:

  1. 合理配置连接选项以提高可靠性
  2. 根据场景选择同步或异步模式
  3. 使用JetStream实现消息持久化
  4. 利用队列组实现负载均衡
  5. 注意错误处理和重连机制

以上代码示例展示了NATS Go客户端的基本和高级用法,你可以根据实际需求进行调整和扩展。

回到顶部