Golang Go语言中 NATS-STREAMING 问题:先 publish 消息再启动 QueueSubscriber,相同队列仅一个 Subscriber 能消费消息?
Golang Go语言中 NATS-STREAMING 问题:先 publish 消息再启动 QueueSubscriber,相同队列仅一个 Subscriber 能消费消息?
PUBLISHER 代码:
func main() {
nc,err := stan.Connect("test-cluster","idc",stan.NatsURL("nats://127.0.0.1:4222"))
if err != nil{
panic(err)
}
fmt.Println("connect succ")
for i:=0;i<10;i++{
fmt.Println("publishing:",i)
err := nc.Publish("tp1",[]byte(strconv.Itoa(i)))
if err != nil{
panic(err)
}
}
nc.Close()
}
QueueSubscriber 代码:
func main() {
nc,err := stan.Connect("test-cluster","subscriber",stan.NatsURL("nats://localhost:4222"))
if err != nil{
panic(err)
}
defer nc.Close()
subs := make([]stan.Subscription,3)
for i:=0;i<3;i++{
workername := "worker"+strconv.Itoa(i)
fmt.Println(fmt.Sprintf("QueueSubscribe %s start",workername))
sub,err := nc.QueueSubscribe("tp1","ch1", func(msg *stan.Msg) {
fmt.Println(workername,"get msg:",string(msg.Data),"start doing something")
time.Sleep(1*time.Second)
},stan.DurableName("subscriber"),stan.AckWait(time.Hour*24))
if err != nil{
panic(err)
}
subs[i] = sub
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
select{
case <- c:
fmt.Println("Subscriber CLOSE")
for i,_ := range subs{
subs[i].Close()
}
nc.Close()
fmt.Println("quit")
}
}
Publisher 输出:
connect succ
publishing: 0
publishing: 1
publishing: 2
publishing: 3
publishing: 4
publishing: 5
publishing: 6
publishing: 7
publishing: 8
publishing: 9
QueueSubscriber 输出:
QueueSubscribe worker0 start
QueueSubscribe worker1 start
worker0 get msg: 0 start doing something
QueueSubscribe worker2 start
worker0 get msg: 1 start doing something
worker0 get msg: 2 start doing something
worker0 get msg: 3 start doing something
worker0 get msg: 4 start doing something
worker0 get msg: 5 start doing something
worker0 get msg: 6 start doing something
worker0 get msg: 7 start doing something
worker0 get msg: 8 start doing something
worker0 get msg: 9 start doing something
请问朋友们是否有遇到过一样的问题呢?谢谢大家
更多关于Golang Go语言中 NATS-STREAMING 问题:先 publish 消息再启动 QueueSubscriber,相同队列仅一个 Subscriber 能消费消息?的实战教程也可以访问 https://www.itying.com/category-94-b0.html
这是特性,queue 模式. 如果想每个订阅者都收到,设置不同的 queue 名字或普通方式订阅就行. 可以看下我的博客 https://imhanjm.com/2018/02/17/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3nats%20&%20nats%20streaming/
更多关于Golang Go语言中 NATS-STREAMING 问题:先 publish 消息再启动 QueueSubscriber,相同队列仅一个 Subscriber 能消费消息?的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
额…不是,我的意思是,为啥其他的 worker 不工作,只有 worker0 在工作
你这是同一个连接啊 一般 queueSub 是不同的进程即不同的连接 你试试给每个 worker 创建一个连接.
我搞清楚了…是因为没有设置 MaxInflight 值的问题,派出的任务都最先启动的 worker 给接收了。设置 MaxInflight 值为 1 实现了正常分 worker 执行。谢谢啦
在Golang中使用NATS Streaming时,关于您提到的先发布消息再启动QueueSubscriber的问题,确实存在一些关键行为需要明确。
NATS Streaming支持队列订阅(QueueSubscriber),这意味着同一队列内的多个订阅者中只有一个会接收到每条发布的消息,实现了消息负载的均衡和处理的去重。然而,这个机制依赖于NATS Streaming服务器的状态管理和订阅者的注册。
如果您先发布消息,再启动QueueSubscriber,只要消息在NATS Streaming服务器的存储(如内存或持久化存储)中未被清除,新加入的QueueSubscriber仍然能够接收到这些消息。NATS Streaming服务器会记录尚未被队列中任何订阅者处理的消息,并确保每个消息只被队列中的一个订阅者处理。
重要的是,NATS Streaming服务器的配置(如消息持久化策略、消息存储大小等)会影响消息的可获取性。如果服务器配置为不持久化消息,并且消息在内存中因达到存储限制而被丢弃,那么新加入的订阅者将无法接收到这些消息。
因此,虽然您可以在发布消息后再启动QueueSubscriber,但为了确保消息的可靠传递,建议合理配置NATS Streaming服务器,并根据业务需求设计消息重试和死信队列等容错机制。这样,即使面对订阅者延迟启动或故障恢复的场景,也能保证消息处理的完整性和可靠性。