Python中如何实现延迟任务的增加和删除
两个 rest api
1. 增加一个延迟任务
2. 删除某延迟任务
延迟的时间很大,可能 2 周.2 个月都有可能.
这样的需求用啥做比较好?
apscheduler 的问题是 起多个进程处理起来比较麻烦.
celery 可以处理类似问题吗?
谢谢了
Python中如何实现延迟任务的增加和删除
任务是一次性的,不是定时任务.
在Python里搞延迟任务,用asyncio或者threading.Timer都行。asyncio适合异步应用,Timer适合简单场景。下面给你两个能直接跑的方案。
方案一:用asyncio(推荐给异步程序)
这个方案用asyncio.create_task、asyncio.sleep和asyncio.CancelledError来管理任务。
import asyncio
class AsyncDelayManager:
def __init__(self):
self.tasks = {} # 存任务ID和对应的task对象
self.next_id = 0
async def _exec_task(self, task_id, delay, func, *args, **kwargs):
try:
await asyncio.sleep(delay)
await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
except asyncio.CancelledError:
print(f"任务 {task_id} 被取消")
finally:
self.tasks.pop(task_id, None)
def add_task(self, delay, func, *args, **kwargs):
"""增加延迟任务,返回任务ID"""
task_id = self.next_id
self.next_id += 1
task = asyncio.create_task(self._exec_task(task_id, delay, func, *args, **kwargs))
self.tasks[task_id] = task
return task_id
def cancel_task(self, task_id):
"""取消任务"""
if task_id in self.tasks:
self.tasks[task_id].cancel()
return True
return False
# 使用示例
async def my_task(name):
print(f"执行任务: {name}")
async def main():
manager = AsyncDelayManager()
# 增加3秒后执行的任务
task_id = manager.add_task(3, my_task, "hello")
print(f"已创建任务 ID: {task_id}")
# 取消任务(把这行注释掉就能看到任务执行)
# manager.cancel_task(task_id)
# 等足够时间让任务执行或取消
await asyncio.sleep(5)
# 运行
asyncio.run(main())
方案二:用threading.Timer(适合同步程序) 这个更简单直接,但注意它是在独立线程里跑的。
import threading
from typing import Callable, Any
class TimerDelayManager:
def __init__(self):
self.timers = {} # 存任务ID和Timer对象
self.next_id = 0
def add_task(self, delay: float, func: Callable, *args, **kwargs) -> int:
"""增加延迟任务,返回任务ID"""
def task_wrapper():
try:
func(*args, **kwargs)
finally:
self.timers.pop(task_id, None)
task_id = self.next_id
self.next_id += 1
timer = threading.Timer(delay, task_wrapper)
timer.start()
self.timers[task_id] = timer
return task_id
def cancel_task(self, task_id: int) -> bool:
"""取消任务"""
if task_id in self.timers:
self.timers[task_id].cancel()
self.timers.pop(task_id, None)
return True
return False
# 使用示例
def my_sync_task(name):
print(f"执行同步任务: {name}")
if __name__ == "__main__":
manager = TimerDelayManager()
# 增加5秒后执行的任务
task_id = manager.add_task(5, my_sync_task, "world")
print(f"已创建任务 ID: {task_id}")
# 取消任务(把这行注释掉就能看到任务执行)
# manager.cancel_task(task_id)
# 主线程等一会儿
threading.Event().wait(7)
选哪个?
- 如果你的程序已经是
asyncio的,用方案一。 - 如果是普通同步程序,用方案二。
一句话建议:异步程序用asyncio方案,同步程序用Timer方案。
距离成功只差最后一步.
django 中如何使用 apscheduler?
只启动一次,可以在多个 view 中使用.
并且在启动的时候,可以使用 django.cache.lock
感觉方向不是很恰当,celery 可以实现,直接使用 task.delay 或者 send_task 配合 countdown 参数就可以直接延迟执行,刚好也是执行一次.
但是 2 周好久啊,内存已经不靠谱了,celery+redis 勉强可以算作是可靠的吧
如果我来做我选择数据库+crontab
赞同#3,延迟那么久用什么异步没意义了,还是持久化到数据库然后跑定时任务比较好
任务丢出去,如何取消?
这不是定时任务,是一次性延时任务.并且可以被取消.你的意思是说,定时任务跑起来,然后删掉?
老老实实丢在数据库里面,再加个定时任务来消费,数据库的事务性能免去一大堆用其他乱七八糟 queue 带来的麻烦
任务开始跑了不用再取消吧,没跑不就还在数据库嘛,删掉或者标记一下就好了呀。我们说的定时任务是一种服务,你可以设定每隔多久跑一次,然后你就可以设置比如每分钟数据库查一下有没有到期的任务,有就扔给 worker 跑就好了。celery 也有类似功能,搜“ celery cron ”
https://github.com/snower/forsun
使用 redis 保存的话,开 redis 持久化就不会丢失,添加删除都很方便

