Python中关于Celery的queue配置问题

celery 里面有个配置时,当指定队列不存在时,创建队列

task_create_missing_queues Default: Enabled

这么使 foo.apply_async(args=(ds.id,), queue=“xxxx”)

我想问这个配置和 conf 里配置的

CELERY_QUEUES = (

  Queue('xxxx', Exchange('xxxx'), routing_key='xxxx'),                     

)

有什么区别么

比如我有很多任务,之前解决这个问题,是把不同的任务分配到定义在 conf 里的不同队列

是不是我不用显示定义,分配任务的时候 foo.apply_async(args=(ds.id,), queue=“random ()”)

也可以??


Python中关于Celery的queue配置问题

1 回复

在Celery中配置队列(queue)主要是在celeryconfig.py或主配置文件中使用task_routestask_queues

首先,定义你的队列。通常和CELERY_QUEUES一起设置:

# celeryconfig.py
from kombu import Exchange, Queue

# 定义交换机
default_exchange = Exchange('default', type='direct')
video_exchange = Exchange('video', type='direct')

# 定义队列
CELERY_QUEUES = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('video_tasks', video_exchange, routing_key='video.process'),
)

然后,通过task_routes把任务路由到指定队列:

# celeryconfig.py
CELERY_TASK_ROUTES = {
    'myapp.tasks.process_video': {
        'queue': 'video_tasks',
        'routing_key': 'video.process',
    },
}

或者直接在任务上用@task装饰器指定:

# tasks.py
from celery import Celery
app = Celery('myapp')

@app.task(queue='video_tasks')
def process_video(video_id):
    # 处理视频的逻辑
    pass

启动worker时,用-Q参数指定要监听的队列:

celery -A myapp worker -Q video_tasks,default -l INFO

总结:在CELERY_QUEUES里定义队列,用路由规则或装饰器把任务分配过去,启动worker时指定监听的队列就行。

回到顶部