Python中Celery配合RabbitMQ进行异步任务时,为什么RabbitMQ里消息数量一直增加但任务已被处理?

查看 rabbimtq 自带的后台,发现 Queued messages 里面,ready 和 total 的数量都达到了 5000 多,unacknowledged 的值为 0。不过实际在工作的时候,unacknowledged 的值会变化的,但是最后会变成 0
celery 的配置也没有特殊的地方,就只设置了如下的内容
CELERY_IMPORTS = (‘testtasks’,)
BROKER_URL = ‘amqp://guest:guest@localhost:5672//’
CELERY_RESULT_BACKEND = ‘amqp://’

然后看系统的资源,erl 的进程内存比较大,大概暂用了 300 多 M
是我这边 celery 的配置有问题么?
Python中Celery配合RabbitMQ进行异步任务时,为什么RabbitMQ里消息数量一直增加但任务已被处理?


16 回复

只用过 Redis 的路过帮顶。。。


这种情况通常是任务执行后没有正确发送确认(ACK)给RabbitMQ导致的。

核心原因是:Celery默认采用acknowledgement机制,任务执行成功后需要显式确认,RabbitMQ才会从队列中删除消息。如果任务处理过程中没有正确ACK,即使任务已执行,消息仍会留在队列中(或重新入队)。

常见原因和解决方案:

  1. 确认模式设置问题
# 确保任务配置正确
app.conf.task_acks_late = True  # 任务完成后才确认
app.conf.task_reject_on_worker_lost = True  # 工作进程异常时拒绝任务
  1. 任务异常处理不当
@app.task(bind=True)
def my_task(self):
    try:
        # 任务逻辑
        result = do_something()
        return result
    except Exception as exc:
        # 重要:任务失败时需要明确处理
        self.retry(exc=exc, countdown=60)
        # 或者明确拒绝
        # raise self.retry(exc=exc)
  1. 检查worker启动参数
# 确保使用正确的确认策略
celery -A proj worker --loglevel=info --without-gossip --without-mingle --without-heartbeat
  1. 监控确认状态
# 查看未确认的消息
from celery import current_app
inspector = current_app.control.inspect()
# 检查活跃任务和预取计数
stats = inspector.stats()

快速检查步骤:

  1. 确认worker是否正常处理任务(查看日志)
  2. 检查RabbitMQ管理界面中的"Unacked"消息数
  3. 验证任务函数是否正常返回,没有静默异常

建议:检查任务确认配置和异常处理逻辑。

CELERY_RESULT_BACKEND 你要不指到 redis 里面,,,

用 countdown 了吗?

我在 task 里面做的只是调用了另外的方法,countdown 是在什么时候使用呢? 谢谢

RabbitMQ 不是自带了一个挺好用的 Management Plugin 吗,配置好了就容易看到具体是什么消息堆住了

处理了没 acc?

设置了消息应答了吗,消息需要 ack

unacknowledged 指的是,消息发给 consumer 了,但是没有 ack ;正常情况是会有的,如果你使用了延迟任务,会更多一点;

嗯 , 但是现在是 unacknowledged 的值是 0,那我理解的话,应该 ready 和 total 应该也没有才对。
为啥 ready 和 total 的值是那么多,而且感觉是这个消息还是在 rabbitmq 里存着的,不然为啥进程的内存占用那么多。

按我的理解,如果 unacknowledged 的为 0 了, 意味这消息被出来了,不会被存到某个队列里才对。

unacknowledged 的值为 0 了,这个不是表示已经应答了么?

acc 是怎么个处理法?求指导

#11 就是发一个 ack,回去

已经解决了,升级了 rabbitmq 到 3.3 以上版本就可以了。
之前用的是 rabbitmq 3.1 的版本,然后 celery 是 3.1 的版本

unack 只是一个状态,代表有 consumer 消费了这个消息,但是还没确认;只有当 consumer 确认之后,RMQ 才不用再存储这个消息

谢谢,原来如此

回到顶部