Python分布式任务队列Celery中如何使用队列进行任务调度

使用 Celery 队列启动成功后,在 ipython 中测试, 命令是 :celery -A celery_learn worker -l info -Q for_task_B

[In] 1: from celery_learn.tasks import * 
[In] 2: result = add1.delay(34, 65)
[In] 3: result.status
[Out] 3: u'PENDING'

任务一直没有执行,这是怎么回事???


Python分布式任务队列Celery中如何使用队列进行任务调度

3 回复

在Celery里用队列做任务调度,核心就是配置好路由规则。下面是个完整示例:

首先安装Celery和Redis(作为broker):

pip install celery redis

创建 tasks.py

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

# 定义队列
app.conf.task_queues = {
    'high_priority': {'exchange': 'high_priority', 'routing_key': 'high_priority'},
    'low_priority': {'exchange': 'low_priority', 'routing_key': 'low_priority'},
    'default': {'exchange': 'default', 'routing_key': 'default'}
}

# 路由配置
app.conf.task_routes = {
    'tasks.process_urgent': {'queue': 'high_priority'},
    'tasks.process_background': {'queue': 'low_priority'},
    '*': {'queue': 'default'}  # 默认队列
}

@app.task
def process_urgent(data):
    print(f"处理紧急任务: {data}")
    return f"紧急任务完成: {data}"

@app.task  
def process_background(data):
    print(f"处理后台任务: {data}")
    return f"后台任务完成: {data}"

@app.task
def default_task(data):
    print(f"处理默认任务: {data}")
    return f"默认任务完成: {data}"

启动worker时指定监听的队列:

# worker1只处理高优先级队列
celery -A tasks worker --queues=high_priority --loglevel=info

# worker2处理低优先级和默认队列  
celery -A tasks worker --queues=low_priority,default --loglevel=info

发送任务时,任务会自动路由到指定队列:

from tasks import process_urgent, process_background, default_task

# 这些任务会进入不同队列
process_urgent.delay("重要数据")
process_background.delay("日志数据")  
default_task.delay("普通数据")

还可以动态指定队列:

process_urgent.apply_async(args=["动态数据"], queue='custom_queue')

关键就两点:1)在配置里定义队列和路由规则;2)启动worker时指定要监听的队列。这样就能实现任务分类处理了。

总结:配置路由规则,worker按队列监听。


之前没有查看文档,下次先查了再提问了

回到顶部