Python分布式任务队列Celery中检查返回状态(result.ready)时出现死循环怎么办?

RT:小弟这边为了保证某 task 优先运行完成,采用了检查结果返回状态的法子:

while not result.ready():

比如下面的样例代码,任务没有 ready 就一直循环( PS:用 result.get 也会出现类似的情况):

from tasks import add
result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
while not result.ready():
    time.sleep(1)
print 'task done: {0}'.format(result.get())

结果小弟发现那个需要优先运行的任务,就一直处在 received 的状态,result.ready()的返回一直是 False。 此前以为 task 函数的原因,检查了并没有发现问题。

很急,在线求大神解答为啥会出现这种情况。 感谢!


Python分布式任务队列Celery中检查返回状态(result.ready)时出现死循环怎么办?

21 回复

while True:


遇到 result.ready() 检查死循环,通常是同步等待异步结果的典型问题。核心原因在于 ready() 只检查状态,不阻塞,在循环中如果任务未完成,就会空转。

最直接的解决方法是使用 Celery 提供的同步获取结果的方法,替代手动循环检查。这里有几种标准做法:

1. 使用 result.get(timeout=timeout) 阻塞等待 这是最推荐的方式。它会阻塞直到任务完成或超时,你可以设置超时时间避免无限等待。

from celery.result import AsyncResult

# 假设你有一个任务ID
task_id = "your-task-id-here"
result = AsyncResult(task_id, app=your_celery_app)

try:
    # 阻塞等待,最多等10秒
    final_result = result.get(timeout=10)
    print(f"任务结果: {final_result}")
except TimeoutError:
    print("任务等待超时")
except Exception as e:
    print(f"任务执行失败: {e}")

2. 使用 result.get() 不带超时(谨慎使用) 这会一直阻塞到任务完成。通常只在明确知道任务会很快完成时使用,或在可控的环境下(如脚本)。

result = AsyncResult(task_id, app=app)
# 一直等待,直到任务完成返回结果或抛出异常
final_result = result.get()

3. 使用 result.wait() 方法(类似 get wait() 也是阻塞调用,行为与 get() 类似,但可以设置超时和轮询间隔。

result = AsyncResult(task_id, app=app)
try:
    final_result = result.wait(timeout=10, interval=0.5) # 每0.5秒检查一次
except TimeoutError:
    print("超时")

为什么不要用 while not result.ready(): 循环? 因为 ready() 调用成本极低,在任务运行期间(可能几秒甚至几分钟),这个循环会以极高的频率空转,白浪费CPU,代码看起来也不专业。上面的 get()wait() 在底层会以更高效的方式(如基于事件或条件变量)等待,不会忙等。

如果你必须在循环中做其他事,那也应该加上休眠,但依然不推荐:

# 迫不得已时的改进(仍然不优雅)
while not result.ready():
    time.sleep(0.1)  # 至少让出CPU
    # 这里可以做一些其他小事情
# 循环结束后再用 result.get() 取结果

总结:别手搓轮询,直接用 result.get(timeout=...)

while True:
if result.ready():
break


这样才是循环,你那个一开始任务是 false,循环都进不去


result.ready()结果开始就是 False 啊,循环是进去了的,time.sleep(1)会一直下去,这个我试过打印内容印证过的。
但我要做的那个任务始终在 PENDING,或者说是 RECEIVED 状态,并没有开始做,不用循环,直接 result.ready()是可以运行那个任务的,但是却没有锁住的效果了。

这段代码就是例子代码,没问题,请教别人的时候给出详细的信息,用的是 redis 还是 rabbitmq,配置贴上來


首先感谢这位朋友,贴下 redis 的配置如下:
celery = Celery(‘achrief’, backend=‘redis://localhost:6379/0’, broker=‘redis://localhost:6379/0’)
我这边仔细比较过,真实代码跟样例情况是一模一样的,换了变量和函数名而已,这里就不贴了。
另外,celery 配置文件里添加了以下内容,不过并没有起作用:
CELERY_IGNORE_RESULT=False
另外,我这边是 Linux 环境,有朋友说的 win 下运行可以加-p threads 或者啥 pool 参数的我也试过了,没有起作用。

celery server 运行起来了么?

程序一直是正常运行的,只不过最近加了检查返回状态才这样,因此应该不是 celery server 的锅。


我那边单独测了上面提到的 demo 代码,是可用的。
但我这边是同时运行了多个任务,其中夹杂了这个优先级高的任务,就会出问题了,代码如下:
if item in match_items:
result = eval(item).delay(x,y)
result_list.append(eval_result)
else:
eval(item).delay(x,y)
for result_item in result_list:
while not result_item.ready():
time.sleep(2)
然后那个任务会一直在 PENDING,或者说是 RECEIVED 状态,并没有启动,一直 sleep。

其他任务都不是 delay 执行的吧?

也是 delay 执行的,8 楼给出了那部分的代码,兄弟麻烦给看看。

你好,8 楼的代码贴的再完整点,解释下 eval 用在这里的意图,还有这段代码不是放到任务里面的吧,感觉问题就在这块了

你这个逻辑是不是应该用 chain 或者 callback ?

eval 是为了把前端传过来的字符串,作为函数名执行哈,咱们可以把它这样等价:
eval(item).delay(x,y) == add.delay(x,y)
另外,确实这段代码是放在一个任务函数里去调用的,用不用 delay 都出现了这种情况。

感谢回复,不过小弟学识有限,没有理解您的意思,您能稍微举个例子么?


加我们的群问效率高些,一群工程师组建的面向初学者的
Python Linux 学习群,qq 群号:278529278,
Php Linux 学习群,qq 群号:476648701,
非商业性质,拒绝广告,只接收真正想学这方面技术的朋友,交流学习,申请请说明来自 v2ex

他说的没错,你这个逻辑不对,最好是改用 chain,而不是在一个任务里面调用另一个任务,官网中有对这个的介绍,还有你可以用个字典名字映射函数,不要用 eval,外部传入的都不可信

eval 之前已经对能执行恶意命令的特殊字符进行了过滤,也不涉及库查询,所以应该不会有安全方面的问题。
我去研究下 chain 和怎么用字典名字映射函数,先谢谢您。
这问题也是够折腾的,这两天在一步步替换正常的配置和代码进行对比,真是头疼,问题出现的有点隐蔽。

我再啰嗦下,eval 你是防不住的,相信我,只要你放到生产环境上,各种猥琐的注入方法会让你大开眼界

非常感谢大牛们的指导,确实黑名单难以过滤完全特殊字符。另外我似乎查出问题所在了,确实是 result.ready()是放在 delay()任务中执行,那里就会阻塞运行不下去了。
我这边前端采用了 flower 的接口控制任务的启动,话说 flower 有参数可以不调用 delay 么?
另外,我试了下如果不调用 delay 的话,不使用 chain 也是可以用的。
是否 flower 前端传过来的参数是必须调用 delay,然后必须用 chain 或者 signature 之类的才能破?


两位大牛,不好意思再次打扰一下。
我这边看了下,我前端 flower 通过参数传过来,进行程序启动的那个函数,肯定是需要 delay 的,调用方式就下面两种:
http://flower-docs-cn.readthedocs.io/zh/latest/api.html#post-api-task-async-apply
http://flower-docs-cn.readthedocs.io/zh/latest/api.html#post-api-task-send-task
那么问题来了,我看了下案例:
http://celery.readthedocs.io/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks
这里调用 chain 是不能使用 delay 的函数的,好像是需要在一个普通函数或者在程序主体里去调用 chain 的。
那么在程序入口需要在 delay 的函数里的情况下,我应该如何去使用 chain 呢?这好像是个悖论,没想明白应该怎么弄。

回到顶部