Python分布式任务队列Celery中如何使用队列进行任务调度
使用 Celery 队列启动成功后,在 ipython 中测试, 命令是 :celery -A celery_learn worker -l info -Q for_task_B
[In] 1: from celery_learn.tasks import *
[In] 2: result = add1.delay(34, 65)
[In] 3: result.status
[Out] 3: u'PENDING'
任务一直没有执行,这是怎么回事???
Python分布式任务队列Celery中如何使用队列进行任务调度
3 回复
在Celery里用队列做任务调度,核心就是配置好路由规则。下面是个完整示例:
首先安装Celery和Redis(作为broker):
pip install celery redis
创建 tasks.py:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
# 定义队列
app.conf.task_queues = {
'high_priority': {'exchange': 'high_priority', 'routing_key': 'high_priority'},
'low_priority': {'exchange': 'low_priority', 'routing_key': 'low_priority'},
'default': {'exchange': 'default', 'routing_key': 'default'}
}
# 路由配置
app.conf.task_routes = {
'tasks.process_urgent': {'queue': 'high_priority'},
'tasks.process_background': {'queue': 'low_priority'},
'*': {'queue': 'default'} # 默认队列
}
@app.task
def process_urgent(data):
print(f"处理紧急任务: {data}")
return f"紧急任务完成: {data}"
@app.task
def process_background(data):
print(f"处理后台任务: {data}")
return f"后台任务完成: {data}"
@app.task
def default_task(data):
print(f"处理默认任务: {data}")
return f"默认任务完成: {data}"
启动worker时指定监听的队列:
# worker1只处理高优先级队列
celery -A tasks worker --queues=high_priority --loglevel=info
# worker2处理低优先级和默认队列
celery -A tasks worker --queues=low_priority,default --loglevel=info
发送任务时,任务会自动路由到指定队列:
from tasks import process_urgent, process_background, default_task
# 这些任务会进入不同队列
process_urgent.delay("重要数据")
process_background.delay("日志数据")
default_task.delay("普通数据")
还可以动态指定队列:
process_urgent.apply_async(args=["动态数据"], queue='custom_queue')
关键就两点:1)在配置里定义队列和路由规则;2)启动worker时指定要监听的队列。这样就能实现任务分类处理了。
总结:配置路由规则,worker按队列监听。
之前没有查看文档,下次先查了再提问了

