Python中如何实现延迟任务的增加和删除

两个 rest api
1. 增加一个延迟任务
2. 删除某延迟任务

延迟的时间很大,可能 2 周.2 个月都有可能.

这样的需求用啥做比较好?

apscheduler 的问题是 起多个进程处理起来比较麻烦.
celery 可以处理类似问题吗?


谢谢了
Python中如何实现延迟任务的增加和删除

9 回复

任务是一次性的,不是定时任务.


在Python里搞延迟任务,用asyncio或者threading.Timer都行。asyncio适合异步应用,Timer适合简单场景。下面给你两个能直接跑的方案。

方案一:用asyncio(推荐给异步程序) 这个方案用asyncio.create_taskasyncio.sleepasyncio.CancelledError来管理任务。

import asyncio

class AsyncDelayManager:
    def __init__(self):
        self.tasks = {}  # 存任务ID和对应的task对象
        self.next_id = 0

    async def _exec_task(self, task_id, delay, func, *args, **kwargs):
        try:
            await asyncio.sleep(delay)
            await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
        except asyncio.CancelledError:
            print(f"任务 {task_id} 被取消")
        finally:
            self.tasks.pop(task_id, None)

    def add_task(self, delay, func, *args, **kwargs):
        """增加延迟任务,返回任务ID"""
        task_id = self.next_id
        self.next_id += 1
        task = asyncio.create_task(self._exec_task(task_id, delay, func, *args, **kwargs))
        self.tasks[task_id] = task
        return task_id

    def cancel_task(self, task_id):
        """取消任务"""
        if task_id in self.tasks:
            self.tasks[task_id].cancel()
            return True
        return False

# 使用示例
async def my_task(name):
    print(f"执行任务: {name}")

async def main():
    manager = AsyncDelayManager()
    
    # 增加3秒后执行的任务
    task_id = manager.add_task(3, my_task, "hello")
    print(f"已创建任务 ID: {task_id}")
    
    # 取消任务(把这行注释掉就能看到任务执行)
    # manager.cancel_task(task_id)
    
    # 等足够时间让任务执行或取消
    await asyncio.sleep(5)

# 运行
asyncio.run(main())

方案二:用threading.Timer(适合同步程序) 这个更简单直接,但注意它是在独立线程里跑的。

import threading
from typing import Callable, Any

class TimerDelayManager:
    def __init__(self):
        self.timers = {}  # 存任务ID和Timer对象
        self.next_id = 0

    def add_task(self, delay: float, func: Callable, *args, **kwargs) -> int:
        """增加延迟任务,返回任务ID"""
        def task_wrapper():
            try:
                func(*args, **kwargs)
            finally:
                self.timers.pop(task_id, None)
        
        task_id = self.next_id
        self.next_id += 1
        
        timer = threading.Timer(delay, task_wrapper)
        timer.start()
        self.timers[task_id] = timer
        return task_id

    def cancel_task(self, task_id: int) -> bool:
        """取消任务"""
        if task_id in self.timers:
            self.timers[task_id].cancel()
            self.timers.pop(task_id, None)
            return True
        return False

# 使用示例
def my_sync_task(name):
    print(f"执行同步任务: {name}")

if __name__ == "__main__":
    manager = TimerDelayManager()
    
    # 增加5秒后执行的任务
    task_id = manager.add_task(5, my_sync_task, "world")
    print(f"已创建任务 ID: {task_id}")
    
    # 取消任务(把这行注释掉就能看到任务执行)
    # manager.cancel_task(task_id)
    
    # 主线程等一会儿
    threading.Event().wait(7)

选哪个?

  • 如果你的程序已经是asyncio的,用方案一。
  • 如果是普通同步程序,用方案二。

一句话建议:异步程序用asyncio方案,同步程序用Timer方案。

距离成功只差最后一步.

django 中如何使用 apscheduler?
只启动一次,可以在多个 view 中使用.
并且在启动的时候,可以使用 django.cache.lock

感觉方向不是很恰当,celery 可以实现,直接使用 task.delay 或者 send_task 配合 countdown 参数就可以直接延迟执行,刚好也是执行一次.
但是 2 周好久啊,内存已经不靠谱了,celery+redis 勉强可以算作是可靠的吧

如果我来做我选择数据库+crontab

赞同#3,延迟那么久用什么异步没意义了,还是持久化到数据库然后跑定时任务比较好



任务丢出去,如何取消?

这不是定时任务,是一次性延时任务.并且可以被取消.你的意思是说,定时任务跑起来,然后删掉?

老老实实丢在数据库里面,再加个定时任务来消费,数据库的事务性能免去一大堆用其他乱七八糟 queue 带来的麻烦

任务开始跑了不用再取消吧,没跑不就还在数据库嘛,删掉或者标记一下就好了呀。我们说的定时任务是一种服务,你可以设定每隔多久跑一次,然后你就可以设置比如每分钟数据库查一下有没有到期的任务,有就扔给 worker 跑就好了。celery 也有类似功能,搜“ celery cron ”

https://github.com/snower/forsun

使用 redis 保存的话,开 redis 持久化就不会丢失,添加删除都很方便

回到顶部