Golang中RabbitMQ通道在使用QoS时因消息数量超限导致通道停止的问题

Golang中RabbitMQ通道在使用QoS时因消息数量超限导致通道停止的问题

package rmqcode

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

func recWorker(worknum int, del *amqp.Delivery) {
    println(fmt.Sprintf("start %d", worknum))
    time.Sleep(time.Second * 2)
    println(fmt.Sprintf("end %d", worknum))
    del.Ack(true)
}

func Receiver() {
    conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        true,    // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        3,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msg, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        var counter = 1
        for msgs := range msg {
            go recWorker(counter, &msgs)
            counter = counter + 1
        }
        fmt.Println("stopped")
        wg.Done()
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    wg.Wait()
}

func Send() {
    conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        true,    // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    body := "Hello World!"
    err = ch.PublishWithContext(ctx,
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")
    log.Printf(" [x] Sent %s\n", body)
}

当队列中的消息超过3条时,接收函数启动,在处理完3条消息后,消息通道关闭。这是预期的行为,还是我遗漏了什么?


更多关于Golang中RabbitMQ通道在使用QoS时因消息数量超限导致通道停止的问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中RabbitMQ通道在使用QoS时因消息数量超限导致通道停止的问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


问题出在消息确认的处理方式上。当您设置 Qos(prefetchCount=3) 后,通道最多只能有3条未确认的消息。您的代码在处理消息时启动了新的goroutine,但主goroutine继续从通道消费消息,导致未确认消息数量超过限制。

以下是修复后的代码示例:

func Receiver() {
    conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        true,    // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        3,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    var wg sync.WaitGroup
    workerPool := make(chan struct{}, 3) // 限制并发worker数量

    for d := range msgs {
        wg.Add(1)
        workerPool <- struct{}{} // 获取worker槽位
        
        go func(delivery amqp.Delivery) {
            defer wg.Done()
            defer func() { <-workerPool }() // 释放worker槽位
            
            recWorker(delivery)
        }(d)
    }

    wg.Wait()
}

func recWorker(delivery amqp.Delivery) {
    log.Printf("Received a message: %s", delivery.Body)
    time.Sleep(time.Second * 2)
    delivery.Ack(false)
}

或者使用带缓冲的通道控制并发:

func Receiver() {
    // ... 前面的连接和声明代码不变 ...

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    semaphore := make(chan struct{}, 3) // 信号量限制并发数

    for d := range msgs {
        semaphore <- struct{}{} // 如果已满3个,这里会阻塞
        
        go func(delivery amqp.Delivery) {
            defer func() { <-semaphore }()
            
            log.Printf("Processing message: %s", delivery.Body)
            time.Sleep(time.Second * 2)
            delivery.Ack(false)
        }(d)
    }
}

关键点:

  1. Qos(3) 限制通道上最多有3条未确认的消息
  2. 您的原始代码中,主goroutine持续从 msg 通道读取,即使worker goroutine还未完成处理
  3. 需要控制并发goroutine的数量,确保同时处理的消息不超过Qos设置的值
  4. 消息确认必须在处理完成后执行,否则会超过prefetch限制
回到顶部