Python中Celery任务出现[Soft time limit exceeded]错误,如何强制结束任务?
在使用 soft_time_limit 的时候遇到了一个问题,debug 信息已经显示类似以下信息:
Soft time limit (180s) exceeded
@celery.task(soft_time_limit=180)
def test_error(para):
do_something…
然后我去查了下 task id,还是处于 starting 状态。 多个任务出现这种情况,有的再过一段时间会变成 SUCCESS,还有的任务就干脆放弃治疗,一直卡在 starting 了。
看了个解决方法,是在调用任务时 catch 错误 SoftTimeLimitExceeded:
from celery.exceptions import SoftTimeLimitExceeded
@app.task
def mytask():
try:
return do_work()
except SoftTimeLimitExceeded:
cleanup_in_a_hurry()
不过我的任务函数 test_error 是在 workflow 里的 chord 调用的,不太好 catch:
chord( ( test_error.s(x) for x in y) , test_result.s() )().get()
我原本还以为 soft_time_limit 算是 time_limit 的不报错版本,会自动 continue 啥的。test_error 这里如果直接使用 time_limit 的话,报错似乎会让 chord 直接失败。
哪位大佬能给个好的解决办法?
Python中Celery任务出现[Soft time limit exceeded]错误,如何强制结束任务?
不是很明白你的表述,是这样用的意思么?<br>():<br>def mytask():<br> chord( ( test_error.s(x) for x in y) , test_result.s() )().get()<br>
遇到Celery的SoftTimeLimitExceeded错误,说明任务执行时间超过了软时间限制但还没到硬限制。这时候任务还在跑,但已经抛异常了。如果你想在超时后立刻终止任务,得用硬时间限制或者手动杀进程。
最直接的办法是在任务装饰器里同时设置soft_time_limit和time_limit。当超过time_limit(硬限制),Celery会直接终止任务进程。下面是个例子:
from celery import Celery
from celery.exceptions import TimeLimitExceeded
app = Celery('tasks')
@app.task(soft_time_limit=5, time_limit=10)
def my_task():
try:
# 你的长时间运行逻辑
import time
time.sleep(20) # 这会触发超时
except TimeLimitExceeded:
# 硬限制超时后,任务进程会被终止,这里的代码可能执行不到
print("Hard time limit exceeded, task killed")
raise
如果任务卡死了连硬限制都等不了,你可以在代码里自己检查并退出。比如在循环里判断已执行时间:
import time
from celery import current_task
@app.task
def stubborn_task():
start_time = time.time()
max_time = 5 # 最大允许5秒
while True:
if time.time() - start_time > max_time:
# 主动退出
return "Task terminated manually"
# ... 任务逻辑
还有个狠招是用signal模块在软超时时直接终止进程,不过得小心资源清理问题:
import signal
from celery.exceptions import SoftTimeLimitExceeded
@app.task(soft_time_limit=5)
def task_with_signal():
def handler(signum, frame):
raise SystemExit("Terminated by soft time limit")
signal.signal(signal.SIGALRM, handler)
signal.alarm(5) # 5秒后发信号
try:
# 你的代码
time.sleep(10)
except SystemExit:
raise SoftTimeLimitExceeded()
finally:
signal.alarm(0) # 取消警报
简单说,设硬限制最省事。
您那种写法的话,错一个子任务 test_error,chord 就直接报错,不会返回任何内容了。
这里的 mytask 案例网上找的,对应的应该是我这里的 test_error。
昨儿我试了下,好像是我在 test_error 函数里,import 了其他耗时的模块造成的,但仍然不知道咋解决,这里因为不能 raise 错误,所以用不了 time_limit,SoftTimeLimitExceeded 在 test_error 函数(乃至主函数里)一直捕获不到。
另外,我像下面这样是可以在 180s 的时候,得到 soft time sleep 的,然后任务变成 success 状态:<br>.task(soft_time_limit=180)<br>def test_error():<br> time.sleep(500)<br><br>
你可以把完整的例子贴上来,业务逻辑省略掉,保留你想要表达的主任务,try…catch… 和子任务名,这样比较方便看调用结构,比如<br>def main():<br> slow_task.delay()<br><br>()<br>def slow_task():<br>chord( ( test_error.s(x) for x in y) , test_result.s() )().get()<br><br>()<br>def test_error(a):<br>try:<br>sleep(a)<br>catch SoftTimeLimitExceeded as e:<br>pass<br><br>()<br>def test_result(a):<br>print("all success")<br><br><br>main()<br>
这样哪里调同步哪里调异步都看的很清晰,用语言描述的话比较难 get 到吧
说实话,我仔细对比了下,您贴的那段示例代码差不多就是我的调度结构了。
SoftTimeLimitExceeded 是在 debug 信息里显示的,但是一直 catch 抓不到到,然后部分 test_error 在显示 SoftTimeLimitExceeded 后,超时了一段时间才 success,还有的就一直 starting。

