Golang中RabbitMQ通道在消息数超过QoS限制时冻结的解决方案
Golang中RabbitMQ通道在消息数超过QoS限制时冻结的解决方案 我在排查Go应用程序中遇到的RabbitMQ问题时,偶然看到了这篇帖子(网站 在通道上使用QoS时,如果消息数量超过QoS定义的消息数量,通道会停止)。问题是,当我在通道上使用QoS并且消息数量超过定义的限制时,该通道会停止接收消息。
以下是我正在使用的代码:
package rmqcode
import (
"context"
"fmt"
"log"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func recWorker(worknum int, del *amqp.Delivery) {
println(fmt.Sprintf("start %d", worknum))
time.Sleep(time.Second * 2)
println(fmt.Sprintf("end %d", worknum))
del.Ack(true)
}
func Receiver() {
conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
3, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msg, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var wg sync.WaitGroup
wg.Add(1)
go func() {
var counter = 1
for msgs := range msg {
go recWorker(counter, &msgs)
counter = counter + 1
}
fmt.Println("stopped")
wg.Done()
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
wg.Wait()
}
func Send() {
conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := "Hello World!"
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s\n", body)
}
在我的情况下,我将QoS设置为一个特定的数字,但当消息数量超过这个数字时,消费者就不再处理后续的消息。我已经尝试按照帖子中所示运行代码,但仍然面临同样的问题。我怀疑这与消息的确认方式或预取计数有关,但我不完全确定。
有没有其他人在使用RabbitMQ和QoS时遇到过这个问题?对于如何解决这个问题或如何正确处理消息队列而不导致通道冻结,有什么建议吗?
更多关于Golang中RabbitMQ通道在消息数超过QoS限制时冻结的解决方案的实战教程也可以访问 https://www.itying.com/category-94-b0.html
我也遇到了同样的问题,我已经在GitHub上提出了这个问题,Channel returned by consume function auto close when QoS defined & number of message in queue is > QoS value · Issue #296 · rabbitmq/amqp091-go · GitHub
更多关于Golang中RabbitMQ通道在消息数超过QoS限制时冻结的解决方案的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
问题在于消息确认的并发处理方式。当使用go recWorker(counter, &msgs)启动goroutine时,传递的是循环变量的地址,这会导致数据竞争。同时,QoS设置为3意味着通道最多只能有3个未确认的消息,但goroutine的并发执行可能超出这个限制。
以下是修正后的消费者代码:
func Receiver() {
conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 设置QoS:最多3个未确认消息
err = ch.Qos(
3, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
// 使用工作池限制并发goroutine数量
const maxWorkers = 3
semaphore := make(chan struct{}, maxWorkers)
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for delivery := range msgs {
wg.Add(1)
semaphore <- struct{}{} // 获取信号量
go func(d amqp.Delivery) {
defer wg.Done()
defer func() { <-semaphore }() // 释放信号量
select {
case <-ctx.Done():
d.Nack(false, true) // 重新入队
return
default:
}
// 处理消息
workerID := time.Now().UnixNano()
log.Printf("start worker %d", workerID)
time.Sleep(time.Second * 2)
log.Printf("end worker %d", workerID)
// 确认消息
if err := d.Ack(false); err != nil {
log.Printf("Failed to ack message: %v", err)
}
}(delivery)
}
wg.Wait()
log.Println("Consumer stopped")
}
关键修改:
-
避免循环变量捕获问题:将
delivery作为参数传递给goroutine,确保每个goroutine获得自己的消息副本。 -
使用信号量控制并发:创建大小为3的信号量通道,确保同时运行的goroutine数量不超过QoS限制。
-
添加上下文支持:允许优雅地停止消费者。
-
改进错误处理:在确认消息时检查错误。
-
移除不必要的WaitGroup:直接使用通道循环处理消息。
如果问题仍然存在,可以添加连接恢复机制:
func connectRabbitMQ(url string) (*amqp.Connection, error) {
var conn *amqp.Connection
var err error
for i := 0; i < 5; i++ {
conn, err = amqp.Dial(url)
if err == nil {
return conn, nil
}
time.Sleep(time.Second * time.Duration(i*2))
}
return nil, err
}
这样确保在连接断开时能够重新连接,避免通道永久冻结。

