Python中如何在Tornado协程中获取Celery任务返回值
@tornado.web.asynchronous
@tornado.gen.coroutine
@auth_token
@log_record
def post(self):
"""
bussiness_name
description
:return:
"""
resultCode = self.request.headers.get('resultCode', -1)
resultData = self.request.headers.get('resultData', {})
user_id = resultData['user_id']
token = None
data = {}
body = json.loads(self.request.body)
data['business_name'] = body['name']
data['system_user_id'] = body['user_id']
data['description'] = body['description']
remote_call_result = get_business_result.apply_async(args=["POST", user_id, data])
# response = remote_call_result.wait()
# self.on_post_result(response)
response = yield tornado.gen.Task(remote_call_result.wait())
self.on_post_result(response)
def on_post_result(self, response):
di_result = json.dumps(response)
logger.info("verify, result:%s \n" % di_result)
self.write(json.dumps(di_result))
self.finish()
代码如上,使用 yield gen.Task 无法获取获得的 response,celery 任务是一个 rpc call, 有什么方法可以异步来获取结果?
Python中如何在Tornado协程中获取Celery任务返回值
6 回复
这里是一个 handler 上面的一个 post 方法
在Tornado协程中获取Celery任务返回值,核心是使用tornado-celery库或手动将Celery的异步结果转换为Tornado的Future。这里给你一个直接可用的方案:
import tornado.ioloop
import tornado.web
from celery import Celery
from tornado import gen
from tornado.concurrent import Future
# 1. 配置Celery
celery_app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@celery_app.task
def add(x, y):
import time
time.sleep(2) # 模拟耗时操作
return x + y
# 2. 手动适配器(关键部分)
def celery_async_to_tornado_future(task_result):
"""将Celery的AsyncResult转换为Tornado的Future"""
future = Future()
def check_result():
if task_result.ready():
try:
future.set_result(task_result.get())
except Exception as e:
future.set_exception(e)
else:
# 未完成则继续轮询
tornado.ioloop.IOLoop.current().call_later(0.1, check_result)
tornado.ioloop.IOLoop.current().add_callback(check_result)
return future
# 3. Tornado处理器
class MainHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
# 启动Celery任务
async_result = add.delay(10, 20)
# 转换为Tornado Future并等待
future = celery_async_to_tornado_future(async_result)
result = yield future
self.write(f"计算结果: {result}")
# 4. 应用启动
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
print("服务启动: http://localhost:8888")
tornado.ioloop.IOLoop.current().start()
工作原理:
add.delay()提交任务到Celery,返回AsyncResult对象celery_async_to_tornado_future()创建Tornado的Future,并轮询检查Celery任务状态- 使用
yield future在协程中等待结果,不阻塞事件循环
需要安装的包:
pip install tornado celery redis
运行前准备:
- 启动Redis:
redis-server - 启动Celery worker:
celery -A your_module_name.celery_app worker --loglevel=info
这样就能在Tornado协程中无缝获取Celery任务结果了。本质就是做个异步适配器。
总结建议:用Future适配器桥接两个异步系统。
尝试过在 celery 的 apply_async 添加 callback=self.on_post_result,但是也无法运行收尾的 self.on_post_result
yield remote_call_result 这里应该没有用吧?因为 celery 的 Result 类是需要用 get 或者 wait 来获取返回值的结果
用 future,内部轮询获取结果


