Python中如何使用django-celery实现动态添加周期任务

项目中现在有个需求是用户可以在前端页面添加定时或者周期任务,
后端的话目前采用 django-celery 来实现此类任务,
请问如何实现动态添加任务且不用重启 celery 相关进程?
Python中如何使用django-celery实现动态添加周期任务

11 回复

django-celery-beat


在Django项目中用django-celery实现动态添加周期任务,核心是操作django_celery_beat.models.PeriodicTaskCrontabSchedule模型。下面是一个完整示例:

首先确保已安装并配置好django-celery-beat:

pip install django-celery-beat

在settings.py中添加:

INSTALLED_APPS = [
    ...
    'django_celery_beat',
]

CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

然后创建动态任务的代码:

from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json

def add_periodic_task(task_name, task_path, crontab_time, args=None, kwargs=None):
    """
    动态添加周期任务
    
    Args:
        task_name: 任务名称(唯一标识)
        task_path: Celery任务路径,如'myapp.tasks.my_task'
        crontab_time: 定时设置,格式为(minute, hour, day_of_month, month_of_year, day_of_week)
        args: 位置参数列表
        kwargs: 关键字参数字典
    """
    # 创建或获取crontab计划
    minute, hour, day_of_month, month_of_year, day_of_week = crontab_time
    schedule, _ = CrontabSchedule.objects.get_or_create(
        minute=minute,
        hour=hour,
        day_of_month=day_of_month,
        month_of_year=month_of_year,
        day_of_week=day_of_week,
        timezone='Asia/Shanghai'  # 根据实际情况调整时区
    )
    
    # 创建周期任务
    task_args = json.dumps(args) if args else '[]'
    task_kwargs = json.dumps(kwargs) if kwargs else '{}'
    
    PeriodicTask.objects.update_or_create(
        name=task_name,
        defaults={
            'task': task_path,
            'crontab': schedule,
            'enabled': True,
            'args': task_args,
            'kwargs': task_kwargs,
        }
    )

使用示例:

# 添加一个每天凌晨2点执行的任务
add_periodic_task(
    task_name='daily_report',
    task_path='reports.tasks.generate_daily_report',
    crontab_time=('0', '2', '*', '*', '*'),  # 分钟, 小时, 日, 月, 周几
    kwargs={'report_type': 'summary'}
)

# 添加一个每周一上午9点的任务
add_periodic_task(
    task_name='weekly_cleanup',
    task_path='maintenance.tasks.cleanup_old_data',
    crontab_time=('0', '9', '*', '*', '1')  # 周一
)

要删除或禁用任务:

# 禁用任务
task = PeriodicTask.objects.get(name='daily_report')
task.enabled = False
task.save()

# 删除任务
PeriodicTask.objects.filter(name='weekly_cleanup').delete()

关键点:

  1. 使用CrontabSchedule定义执行时间
  2. PeriodicTask关联具体Celery任务和计划
  3. 通过update_or_create避免重复创建
  4. 参数需要JSON序列化存储

动态任务添加后,Celery beat会自动检测数据库变化并调整调度。记得重启beat服务使新任务生效。

一句话总结:直接操作django_celery_beat的模型来管理动态周期任务。

可以看看 django-celery-beat 这个项目

https://github.com/sibson/redbeat 可以看下这个项目,最近在用,基于 redis 做的,比较通用

djang-celery 不是已经淘汰了吗?我记得已经合并入 celery 本身了啊

如果任务比较轻,时间短直接 while 循环吧,celery 有时假死在那,或者后端 borker 出故障也会出问题。繁重任务还是用专门的消息队列处理吧

  • 5L 正解
    - django-celery 在 celery 升级到 4.0 后已经废弃
    - 最新的 celery 结合 django-celery-beat 可以实现实现动态添加周期任务

可以参考一下我的爬虫项目 crawlab,http://www.github.com/tikazyq/crawlab,之前折腾 celery beat 没有成功,后来转用 apscheduler 就可以动态添加定时任务了 ;-)

文件地址: https://github.com/tikazyq/crawlab/blob/master/crawlab/tasks/scheduler.py

我看的一个项目叫 django-Q https://django-q.readthedocs.io/en/latest/ 因为周期任务都在数据库中,只要填函数名字和参数就可以添加任务

回到顶部