在rabbitmq中如何让未被ack的消息能重新进入queue中再次被Nodejs取到?

在rabbitmq中如何让未被ack的消息能重新进入queue中再次被Nodejs取到?

各位好,

我刚学rabbitmq,现有个问题。我用的是rabbitmq+nodejs+amqplib,当我打开了noAck:false

  chnl.consume('task_queue', doWork, {noAck: false});
function doWork(msg) {
    var body = msg.content.toString();
    console.log(" [x] Received '%s'", body);
    var secs = body.split('.').length - 1;
    console.log(" [x] Task takes %d seconds", secs);
    setTimeout(function() {
        console.log(" [x] Done");
        if(body=='Hello World3!'){
        //不会触发chnl.ack(msg)方法,用来模拟业务操作失败,这样这个hello world3!的消息也还在rabbitmq中,不会丢失,但我还是想这些消息再次进入queue中并能被再次取出,如发送邮件失败后还会再重新发送。请问如何再激活这条消息,难道非要重启这个程序吗?
        }else{
            console.log('ack');
            chnl.ack(msg);
        }
    }, secs * 1000);
}</code></pre>

6 回复

要在RabbitMQ中实现未被确认(未ACK)的消息能够重新进入队列,并且能够在Node.js中再次被消费,你可以使用basicRejectbasicNack方法来拒绝消息。当消息被拒绝时,RabbitMQ会将其重新放回队列,从而可以被其他消费者重新处理。

示例代码

假设你已经有一个基本的RabbitMQ连接和通道设置:

const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', (err, conn) => {
    conn.createChannel((err, chnl) => {
        const queue = 'task_queue';

        chnl.assertQueue(queue, { durable: true });

        chnl.consume(queue, doWork, { noAck: false });

        function doWork(msg) {
            const body = msg.content.toString();
            console.log(" [x] Received '%s'", body);

            const secs = body.split('.').length - 1;

            setTimeout(() => {
                console.log(" [x] Done");

                if (body === 'Hello World3!') {
                    // 模拟业务操作失败,拒绝消息
                    console.log('Message rejected');
                    chnl.nack(msg, false, true); // 重新放入队列
                } else {
                    console.log('ack');
                    chnl.ack(msg); // 确认消息
                }
            }, secs * 1000);
        }
    });
});

解释

  • chnl.consume 方法用于启动一个消费者,监听指定队列的消息。
  • noAck: false 表示消费者需要手动确认消息的处理结果。
  • doWork 函数中:
    • 如果消息处理成功 (Hello World3! 除外),则调用 chnl.ack(msg) 来确认消息。
    • 如果消息处理失败,则调用 chnl.nack(msg, false, true) 来拒绝消息。这里:
      • 第一个参数 msg 是消息对象。
      • 第二个参数 false 表示不重新排队单个消息副本。
      • 第三个参数 true 表示重新排队所有未确认的消息。

通过这种方式,你可以确保在处理失败的情况下,消息会被重新放入队列中,从而可以被其他消费者重新处理。


我最近也在用rabbitmq+node.js,很想和楼主讨论一下这方面的问题,呵呵

你在失败的时候再publish到队列中呗,然后ack

可是实际上这条message还是在队列中的,如果再手工publish一下,会有两条信息的。

原来可以通过channel.nack(message)来让不通过的消息再次进入消息队列。

if(body==‘Hello World3!’){ chnl.nack(msg); //这样就可以让这个消息再次进入队列而不用重启服务。 }else{ console.log(‘ack’); chnl.ack(msg); }

在 RabbitMQ 中,如果你没有确认(ack)一条消息,RabbitMQ 默认会在消息的生存时间(TTL)内重试分发该消息。如果消息一直没有被确认,RabbitMQ 会将该消息重新放回队列。

根据你提供的代码,你已经设置了 noAck: false,这意味着你需要手动确认消息。对于你希望处理失败的情况(例如,当 body'Hello World3!' 时),你需要调用 chnl.nack(msg) 方法来否定确认。这样 RabbitMQ 会知道消息处理失败,并将其重新放回队列中。

以下是修改后的示例代码:

const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, chnl) => {
    const queue = 'task_queue';

    chnl.assertQueue(queue, { durable: true });
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);

    chnl.consume(queue, doWork, { noAck: false });

    function doWork(msg) {
      var body = msg.content.toString();
      console.log(" [x] Received '%s'", body);
      var secs = body.split('.').length - 1;

      console.log(" [x] Task takes %d seconds", secs);
      setTimeout(() => {
        console.log(" [x] Done");

        if (body === 'Hello World3!') {
          // 模拟业务操作失败
          chnl.nack(msg);  // 否定确认消息,消息将重新入队
        } else {
          console.log('ack');
          chnl.ack(msg);  // 确认消息
        }
      }, secs * 1000);
    }
  });
});

在这个例子中,当 body'Hello World3!' 时,我们调用 chnl.nack(msg) 来否定确认该消息,导致 RabbitMQ 将其重新放入队列中。其他情况下,我们会正常确认消息。这样可以确保所有消息即使处理失败也会被重新放回队列中等待再次处理。

回到顶部