Golang如何连接NATS服务器?

Golang如何连接NATS服务器? 我正在尝试使用 NATS 构建一个发布/订阅服务器。第一步是连接到 NATS 服务器,但在文档中没有找到可用的代码。以下是我尝试过的内容:

package main

import (
	"github.com/nats-io/nats.go"
	"fmt"
)

func main() {
	// Connect to a server
	nc, err := nats.Connect("tls://nats.go4webdev.org:4222")
    if err!=nil{
		fmt.Println(err)
	}
	fmt.Println(nc)
}

无论我是否改为调用 nc, err := nats.Connect("nats://nats.go4webdev.org:4222")

从服务器得到的回复是:

dial tcp 104.21.65.248:4222: i/o timeout

如果我省略端口号,会得到这个错误:

dial tcp [2606:4700:3031::ac43:c3d0]:4222: connect: no route to host

NATS 服务器正在运行。端口扫描显示如下:

Port Scanning host: 94.237.X.X Open TCP Port: 4222

NATS 服务器位于 Cloudflare Flexible SSL(代理)上。

我运行了命令 service nats status

> Oct 08 16:02:24 go4webdev.org nats-server[59191]: [59191] 2022/10/08 16:02:24.583853 [ERR] 194.218.x.x:50015 - cid:10 - Client parser ERROR, state=0, i=0: proto='"\x16\x03\x01\x00\xb9\x01\x00\x00\xb5\x03\x01B\xe7\x19\xaa\f\x85\xaf\x81 \xf3\xa6E:)\x88\x93\xe5oKe\x9a"...'
> Oct 08 16:14:00 go4webdev.org nats-server[59191]: [59191] 2022/10/08 16:14:00.470805 [ERR] 127.0.0.1:51756 - cid:12 - Client parser ERROR, state=0, i=0: proto='"GET / HTTP/1.0\r\nHost: localhost:"...'
> Oct 08 16:16:19 go4webdev.org nats-server[59191]: [59191] 2022/10/08 16:16:19.501242 [ERR] 127.0.0.1:51758 - cid:13 - Client parser ERROR, state=0, i=0: proto='"GET / HTTP/1.0\r\nHost: localhost:"...'
> Oct 08 16:18:44 go4webdev.org nats-server[59191]: [59191] 2022/10/08 16:18:44.130856 [ERR] [::1]:41494 - cid:14 - Client parser ERROR, state=0, i=0: proto='"GET / HTTP/1.0\r\nHost: localhost:"...'

有什么线索可以连接到 NATS 服务器吗?我做错了什么?


更多关于Golang如何连接NATS服务器?的实战教程也可以访问 https://www.itying.com/category-94-b0.html

9 回复

为什么感到困惑?你是否按照 docs.nats.io 上的教程操作了? 你的具体使用场景是什么?

更多关于Golang如何连接NATS服务器?的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你确定NATS服务器启用了TLS吗?你试过用 nats:// 而不是 tls:// 吗?请记住,如果启用了TLS,使用 nats:// 会自动切换到TLS。

嗯,教程展示了如何进行订阅。

一般来说,除非你使用了像 Jetstream 这样的持久化机制,否则所有的发布/订阅系统都存在“延迟订阅”的问题。这意味着你无法接收到在你订阅之前就已经发布的消息。

geosoft1:

你试过GitHub页面上的示例吗?

这是那里的示例:

// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)

nats: no servers available for connection

据我所知,这意味着 127.0.0.1:4222。但客户端在我的桌面上,而NATS服务器在一个远程服务器上。还是我漏掉了什么?

你试过该项目GitHub页面上的示例吗?

GitHub

GitHub - nats-io/nats.go: Golang client for NATS, the cloud native messaging…

缩略图

用于NATS(云原生消息系统)的Golang客户端。 - GitHub - nats-io/nats.go: 用于NATS(云原生消息系统)的Golang客户端。

我遵循了这个“教程”并将其修改为使用 nats.defaultURL

好吧,如果你是新手,永远不要轻信教程 🙂

以下是 GitHub 上的代码:

// Connect to a server
nc, _ := nats.Connect(nats.DefaultURL)

// Simple Publisher
nc.Publish("foo", []byte("Hello World"))

// Simple Async Subscriber
nc.Subscribe("foo", func(m *nats.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

但是你必须先订阅再发布…

func main() {
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	// Subscribe
	sub, err := nc.SubscribeSync("foo")
	if err != nil {
		log.Fatal(err)
	}

	// Simple Publisher
	nc.Publish("foo", []byte("Hello World"))

	// Wait for a message
	msg, err := sub.NextMsg(10 * time.Second)
	if err != nil {
		log.Fatal(err)
	}

	// Use the response
	log.Printf("Reply: %s", msg.Data)
}

Reply: Hello World

为什么这很令人困惑?你按照 docs.nats.io 上的教程操作了吗?

我遵循了这个“教程”并将其修改为 nats.defaultURL

Go Playground - The Go Programming Language

根据以下信息,nats-server 已经启动并正在运行:

[1975] 2022/10/13 10:29:23.246236 [INF] Starting nats-server
[1975] 2022/10/13 10:29:23.246350 [INF]   Version:  2.9.2
[1975] 2022/10/13 10:29:23.246354 [INF]   Git:      [6d81dde]
[1975] 2022/10/13 10:29:23.246357 [INF]   Name:     NBACHDMRCRTQN255JFUU7XDLWSNLVCML7AIDTZWGIG3LGMMSWZETRXD6
[1975] 2022/10/13 10:29:23.246359 [INF]   ID:       NBACHDMRCRTQN255JFUU7XDLWSNLVCML7AIDTZWGIG3LGMMSWZETRXD6
[1975] 2022/10/13 10:29:23.249624 [INF] Listening for client connections on 0.0.0.0:4222
[1975] 2022/10/13 10:29:23.250751 [INF] Server is ready

但是当我运行教程代码时,至少会得到一个错误(第一次):

2022/10/13 10:44:31 nats: timeout

这就是为什么我认为 NATS 非常令人困惑。😏 甚至连他们自己的教程都不起作用。

你的使用场景是什么?

我的使用场景基本上是实现一个通知,告诉我发生了某些事情,并且这个通知是专门针对我的。没有其他人。

image

你确定 NATS 服务器启用了 TLS 吗?

在这个例子中我使用了 nats://,但我仍然不明白它是如何工作的。 以下是我的尝试:

服务器正在运行

./nats-server
[2819] 2022/10/11 16:24:33.375813 [INF] Starting nats-server
[2819] 2022/10/11 16:24:33.375911 [INF]   Version:  2.9.2
[2819] 2022/10/11 16:24:33.375915 [INF]   Git:      [6d81dde]
[2819] 2022/10/11 16:24:33.375920 [INF]   Name:     NDXYJG4CQZMVFASH6GZDSUQ2SY2DXD3NLVIILEQP2YTIP7XWEDHHF7OT
[2819] 2022/10/11 16:24:33.375924 [INF]   ID:       NDXYJG4CQZMVFASH6GZDSUQ2SY2DXD3NLVIILEQP2YTIP7XWEDHHF7OT
[2819] 2022/10/11 16:24:33.377446 [INF] Listening for client connections on 0.0.0.0:4222
[2819] 2022/10/11 16:24:33.378535 [INF] Server is ready

端口扫描显示这在本地是开放的:开放 TCP 端口:4222

运行客户端

package main

import (
	"fmt"
	"github.com/nats-io/nats.go"
)

func main() {
	// Connect to a server
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		fmt.Println("no connection")
		fmt.Println(err)
	}
	defer nc.Close()
	fmt.Println("connected? -- no errors reported but not sure")

	// Simple Publisher
	err = nc.Publish("foo", []byte("Hello World"))
	if err != nil {
		fmt.Println("nothing published")
		fmt.Println(err)
	}

	// Simple Async Subscriber
	msg, err := nc.Subscribe("foo", func(msg *nats.Msg) {
		fmt.Println("subscribe--------------------")
		fmt.Println(msg)
		fmt.Println("Received a message: %s\n", string(msg.Data))
	})
	
	if err != nil {
		fmt.Println("nothing subscribed")
		fmt.Println(err)
	}
	fmt.Println("subscribe msg:")
	fmt.Println(msg)

	// Close connection
	//nc.Close()
}

产生以下“错误”

connected? – no errors reported but not sure subscribe msg: &{{0 0} 1 foo 0 0 0xc000198000 0x1249a80 false false false 0 0xc00020e000 0 0 0 0 524288 67108864 0}

我发现 NATS 极其令人困惑。有没有一个更好的工具,能以更简单的方式处理 PubSub 和独立的“房间”或“客户端”?

连接NATS服务器时遇到连接超时问题,通常与网络配置或TLS设置有关。以下是几种连接NATS服务器的示例代码:

1. 基础连接(无TLS):

package main

import (
	"fmt"
	"log"
	"github.com/nats-io/nats.go"
)

func main() {
	// 连接到NATS服务器
	nc, err := nats.Connect("nats://localhost:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()
	
	fmt.Println("Connected to NATS server")
}

2. 带选项的连接:

func main() {
	opts := []nats.Option{
		nats.Name("My NATS Client"),
		nats.MaxReconnects(10),
		nats.ReconnectWait(5 * time.Second),
	}
	
	nc, err := nats.Connect("nats://localhost:4222", opts...)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()
	
	fmt.Println("Connected with custom options")
}

3. TLS连接(如果服务器启用了TLS):

func main() {
	// 加载TLS配置
	tlsConfig := &tls.Config{
		InsecureSkipVerify: true, // 仅用于测试环境
	}
	
	nc, err := nats.Connect("tls://localhost:4222", 
		nats.Secure(tlsConfig))
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()
	
	fmt.Println("Connected via TLS")
}

4. 带认证的连接:

func main() {
	// Token认证
	nc, err := nats.Connect("nats://localhost:4222",
		nats.Token("my-token"))
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()
	
	// 或用户名/密码认证
	nc2, err := nats.Connect("nats://localhost:4222",
		nats.UserInfo("user", "password"))
	if err != nil {
		log.Fatal(err)
	}
	defer nc2.Close()
	
	fmt.Println("Connected with authentication")
}

5. 连接集群:

func main() {
	servers := []string{
		"nats://server1:4222",
		"nats://server2:4222",
		"nats://server3:4222",
	}
	
	nc, err := nats.Connect(strings.Join(servers, ","))
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()
	
	fmt.Println("Connected to NATS cluster")
}

6. 带连接状态处理的连接:

func main() {
	nc, err := nats.Connect("nats://localhost:4222",
		nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
			fmt.Printf("Disconnected: %v\n", err)
		}),
		nats.ReconnectHandler(func(nc *nats.Conn) {
			fmt.Println("Reconnected")
		}),
		nats.ClosedHandler(func(nc *nats.Conn) {
			fmt.Println("Connection closed")
		}))
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()
	
	fmt.Println("Connected with event handlers")
}

从错误信息看,连接超时可能是由于:

  1. 服务器防火墙阻止了连接
  2. NATS服务器配置了TLS但客户端未正确配置
  3. Cloudflare代理可能修改了连接

建议先尝试本地NATS服务器连接测试:

func main() {
	// 测试本地连接
	nc, err := nats.Connect("nats://127.0.0.1:4222")
	if err != nil {
		fmt.Printf("Local connection failed: %v\n", err)
		
		// 尝试带超时的连接
		nc, err = nats.Connect("nats://localhost:4222",
			nats.Timeout(10*time.Second))
		if err != nil {
			log.Fatal(err)
		}
	}
	defer nc.Close()
	
	fmt.Printf("Connected to %s\n", nc.ConnectedUrl())
}

检查NATS服务器配置,确保监听地址正确且防火墙允许4222端口连接。

回到顶部