Golang中RabbitMQ通道在消息数超过QoS限制时冻结的解决方案

Golang中RabbitMQ通道在消息数超过QoS限制时冻结的解决方案 我在排查Go应用程序中遇到的RabbitMQ问题时,偶然看到了这篇帖子(网站 在通道上使用QoS时,如果消息数量超过QoS定义的消息数量,通道会停止)。问题是,当我在通道上使用QoS并且消息数量超过定义的限制时,该通道会停止接收消息。

以下是我正在使用的代码:

package rmqcode

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

func recWorker(worknum int, del *amqp.Delivery) {
    println(fmt.Sprintf("start %d", worknum))
    time.Sleep(time.Second * 2)
    println(fmt.Sprintf("end %d", worknum))
    del.Ack(true)
}

func Receiver() {
    conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        true,    // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        3,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msg, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        var counter = 1
        for msgs := range msg {
            go recWorker(counter, &msgs)
            counter = counter + 1
        }
        fmt.Println("stopped")
        wg.Done()
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    wg.Wait()
}

func Send() {
    conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        true,    // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    body := "Hello World!"
    err = ch.PublishWithContext(ctx,
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")
    log.Printf(" [x] Sent %s\n", body)
}

在我的情况下,我将QoS设置为一个特定的数字,但当消息数量超过这个数字时,消费者就不再处理后续的消息。我已经尝试按照帖子中所示运行代码,但仍然面临同样的问题。我怀疑这与消息的确认方式或预取计数有关,但我不完全确定。

有没有其他人在使用RabbitMQ和QoS时遇到过这个问题?对于如何解决这个问题或如何正确处理消息队列而不导致通道冻结,有什么建议吗?


更多关于Golang中RabbitMQ通道在消息数超过QoS限制时冻结的解决方案的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

更多关于Golang中RabbitMQ通道在消息数超过QoS限制时冻结的解决方案的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


问题在于消息确认的并发处理方式。当使用go recWorker(counter, &msgs)启动goroutine时,传递的是循环变量的地址,这会导致数据竞争。同时,QoS设置为3意味着通道最多只能有3个未确认的消息,但goroutine的并发执行可能超出这个限制。

以下是修正后的消费者代码:

func Receiver() {
    conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        true,    // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    // 设置QoS:最多3个未确认消息
    err = ch.Qos(
        3,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    // 使用工作池限制并发goroutine数量
    const maxWorkers = 3
    semaphore := make(chan struct{}, maxWorkers)
    
    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    for delivery := range msgs {
        wg.Add(1)
        semaphore <- struct{}{} // 获取信号量
        
        go func(d amqp.Delivery) {
            defer wg.Done()
            defer func() { <-semaphore }() // 释放信号量
            
            select {
            case <-ctx.Done():
                d.Nack(false, true) // 重新入队
                return
            default:
            }
            
            // 处理消息
            workerID := time.Now().UnixNano()
            log.Printf("start worker %d", workerID)
            time.Sleep(time.Second * 2)
            log.Printf("end worker %d", workerID)
            
            // 确认消息
            if err := d.Ack(false); err != nil {
                log.Printf("Failed to ack message: %v", err)
            }
        }(delivery)
    }
    
    wg.Wait()
    log.Println("Consumer stopped")
}

关键修改:

  1. 避免循环变量捕获问题:将delivery作为参数传递给goroutine,确保每个goroutine获得自己的消息副本。

  2. 使用信号量控制并发:创建大小为3的信号量通道,确保同时运行的goroutine数量不超过QoS限制。

  3. 添加上下文支持:允许优雅地停止消费者。

  4. 改进错误处理:在确认消息时检查错误。

  5. 移除不必要的WaitGroup:直接使用通道循环处理消息。

如果问题仍然存在,可以添加连接恢复机制:

func connectRabbitMQ(url string) (*amqp.Connection, error) {
    var conn *amqp.Connection
    var err error
    
    for i := 0; i < 5; i++ {
        conn, err = amqp.Dial(url)
        if err == nil {
            return conn, nil
        }
        time.Sleep(time.Second * time.Duration(i*2))
    }
    return nil, err
}

这样确保在连接断开时能够重新连接,避免通道永久冻结。

回到顶部