Python分布式任务队列框架Celery中,不同类型的任务配置config,是不是要在初始化的时候分别实例化?

比如,三个异步任务,分别是:

1、一个普通的异步任务
2、一个每隔 10 分钟执行一次的任务
3、每天 12 点执行一次的任务

这三个任务都需要应用上下文支持,是不是要在初始化的时候,分别实例化?

celery1 = Celery(name, …)
celery2 = Celery(name, …)
celery3 = Celery(name, …)

感觉好别扭啊
Python分布式任务队列框架Celery中,不同类型的任务配置config,是不是要在初始化的时候分别实例化?


6 回复

是在一个 Celery 里注册这三个任务…

难道你开十个普通异步任务要十个实例吗…


在Celery里,不同类型的任务通常不需要在初始化时分别实例化不同的Celery实例。最佳实践是创建一个主Celery应用实例,然后通过配置或装饰器来管理不同类型的任务。

核心方法是使用Celery类的config_from_object方法,或者直接设置app.conf。你可以根据环境或任务类型加载不同的配置模块。

# celery_app.py
from celery import Celery

# 创建主Celery应用实例
app = Celery('my_project')

# 基础配置
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Shanghai',
    enable_utc=True,
)

# 根据环境加载不同配置(示例:生产环境)
# app.config_from_object('celery_config_production')

# 自动从当前目录下的`tasks`模块发现任务
app.autodiscover_tasks(['tasks'])

对于需要不同配置的任务组,可以使用任务路由队列来区分,而不是创建多个实例:

# 在配置中定义任务路由
app.conf.update(
    task_routes={
        'tasks.process_image': {'queue': 'image_queue'},
        'tasks.send_email': {'queue': 'email_queue'},
    },
    # 为不同队列设置不同的配置(例如,不同的并发数)
    task_queues={
        'image_queue': {
            'exchange': 'images',
            'routing_key': 'image.process',
        },
        'email_queue': {
            'exchange': 'emails',
            'routing_key': 'email.send',
        }
    }
)

然后,在任务模块中使用@app.task装饰器定义任务,它们会自动注册到主应用实例:

# tasks.py
from celery_app import app

@app.task
def process_image(image_path):
    # 处理图片的任务
    pass

@app.task
def send_email(recipient, message):
    # 发送邮件的任务
    pass

总结来说,一个Celery主实例配合任务路由管理不同类型任务即可

开十个普通异步任务我知道呀,一般用装饰器装饰一下就可用 .task_A() .task_B() .task_C() … 这些普通异步任务都是用相同的 Celery config。

我说的问题是定时任务,如“一个每隔 10 分钟执行一次的任务”,“每天 12 点执行一次的任务”这些配置不同啊,直接复用 .task_*(),这怎么区分是普通异步任务?还是定时异步任务呢?

Celery 不区分这个。Celery 的定时任务需要一个专门的守护进程 celery beep。大致原理就是 celery beep 发起一个异步任务,由 celery worker 执行。

可以在配置里面写,也可以在任务中指定

CELERYBEAT_SCHEDULE = {
‘task_a’: {
‘task’: ‘first_task’,
‘schedule’: crontab(hour=18, minute=00),
‘kwargs’: {‘stype’: ‘example’}
},
“task_b”: {
“task”: “second_task”,
“schedule”: crontab(hour=16, minute=30)
}
}

回到顶部