Python中关于djcelery配置queue不生效的问题探讨 [附带workflow子任务锁死问题分析]
我 celery 版本为 v3.1.25 ,django 和 celery 的启动项如下:
celery -A proj-test worker -n workerA.%h --concurrency=2
celery -A proj-test worker -n workerB.%h --concurrency=2
python manage.py celeryd -B
python manage.py celery beat
python manage.py runserver 0.0.0.0:8000
这样是可以运行的,不过好像由于我在 celery 的 task 里面,使用了 workflow 的 chord/group 进行 subtask 分发,导致 subtask 过多时,因为 concurrency 有限,beat 任务( beat_function )会因此堵塞结束不掉。
大概代码如下,下面至少有两个 chord:
[@task](/user/task)
def beat_function():
xxfunc1.delay()
xxfunc2.delay()
time.sleep(5)
chord((test1.s(x) for x in xs) , test2.s())
chord((test3.s(y) for y in ys) , test4.s())
所以我有个问题,想问问大家能否指定 beat 的 queue。
让它不会占用其他 subtask 运行的 worker 容量,这样就算 beat 所在的 queue 在阻塞,等到 subtask 运行完毕,这边 queue 也会结束相应的 beat 任务。
我是这样弄得,queue 配置如下:
CELERY_QUEUES = {
"celery_beat": {
"exchange": "celery_beat",
"exchange_type": "direct",
"routing_key": "celery_beat"
},
}
尝试过的 beat 运行方式如下两种
python manage.py celeryd -B -Q celery_beat
也尝试了下面的运行法子:
python manage.py celery worker -B -Q celery_beat
不过始终不能把 beat 任务运行起来,每次一加 queue 似乎 beat 就失效了。
尝试过同时单独运行,也失败:
python manage.py celery worker --beat
python manage.py celery worker -Q celery_beat
希望大佬们,帮忙解答下面两个问题:
- 单独 queue 运行 beat 任务能否解决我那个死锁问题?
我不能每次手动结束 beat 任务,确实结束了能解决某一次死锁,那些 subtask 能运行完。我曾尝试设置任务超时,单个 chord 的时可用,该 chord 可以运行完结,不过后面还有 chord 分发其他子任务就 gg 了,也就是说只能运行完第一个 chord,比如我上面举的例子。
- 我的上面的运行 beat 的方法是否有误,加了 queue 配置是否有误或者不完整,导致我 beat 任务运行不起来?
我想要实现的是,单个 queue 指定的那些 worker,运行所有的 beat 任务,现在的情况是所有的 worker 都会去抢 beat 任务,结果可能导致了部分 beat 任务阻塞。
Python中关于djcelery配置queue不生效的问题探讨 [附带workflow子任务锁死问题分析]
Woc,没人给点建议么,屌大的大佬们呢,都换上女装去泡吧了么?!
我遇到过类似的问题,djcelery的queue配置确实有些坑。核心问题通常是配置位置不对或者worker启动参数没跟上。
直接上代码,这是能跑通的配置方案:
1. settings.py配置
# Celery配置
CELERY_QUEUES = (
Queue('default', routing_key='default'),
Queue('high_priority', routing_key='high_priority'),
Queue('low_priority', routing_key='low_priority'),
)
CELERY_ROUTES = {
'app.tasks.process_data': {'queue': 'high_priority', 'routing_key': 'high_priority'},
'app.tasks.cleanup_task': {'queue': 'low_priority', 'routing_key': 'low_priority'},
}
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_ROUTING_KEY = 'default'
2. 启动worker时指定队列
# 启动专门处理高优先级队列的worker
celery -A your_project worker -l info -Q high_priority
# 或者启动处理多个队列的worker
celery -A your_project worker -l info -Q high_priority,default
3. 关于workflow子任务锁死问题
这通常是任务依赖没处理好导致的。用chain或group时要注意:
from celery import chain, group
# 错误示例:可能导致锁死
result = chain(task1.s(), task2.s(), task3.s()).apply_async()
# 建议使用回调或chord
chord(header=[task1.s(), task2.s()], body=task3.s()).apply_async()
关键检查点:
- 确认CELERY_QUEUES和CELERY_ROUTES都正确定义了
- worker启动时用
-Q参数指定了要监听的队列 - 任务装饰器没覆盖路由设置:
@task(queue='xxx')会覆盖CELERY_ROUTES - 检查broker(Redis/RabbitMQ)的连接是否正常
总结:配置要对齐,启动参数要跟上。
beat 单独起啊,celery -A xxx beat
单独起了,会出现我上述的死锁问题,所以才想把 beat 任务单独给一个 queue。
但是结果好像不尽如人意,不知道我 router 和 queue 是否配置有问,还是压根不能这样做。
call subtask 的时候不要同步 call,官方有针对死锁的说明
有看过,3.2 以后好像还针对这种情况抛出错误,只是实在没想到更优的解法,来解决一堆 subtask 的问题。
可以把你的任务结构贴一下看看?
嗯,回头我整理下贴出来,代码在公司,兄 dei 长假快乐~
后来用单个 chord 串联了执行了一堆任务,暂时解决了,虽然法子有点笨,老兄回头有空看看我最新的提问…

