Python中如何在Tornado协程中获取Celery任务返回值

@tornado.web.asynchronous
    @tornado.gen.coroutine
    @auth_token
    @log_record
    def post(self):
        """
        bussiness_name
        description
        :return:
        """
        resultCode = self.request.headers.get('resultCode', -1)
        resultData = self.request.headers.get('resultData', {})
        user_id = resultData['user_id']
        token = None
        data = {}
        body = json.loads(self.request.body)
        data['business_name'] = body['name']
        data['system_user_id'] = body['user_id']
        data['description'] = body['description']
        remote_call_result = get_business_result.apply_async(args=["POST", user_id, data])
        # response = remote_call_result.wait()
        # self.on_post_result(response)
        response = yield tornado.gen.Task(remote_call_result.wait())
        self.on_post_result(response)
def on_post_result(self, response):
    di_result = json.dumps(response)
    logger.info("verify, result:%s \n" % di_result)
    self.write(json.dumps(di_result))
    self.finish()

代码如上,使用 yield gen.Task 无法获取获得的 response,celery 任务是一个 rpc call, 有什么方法可以异步来获取结果?


Python中如何在Tornado协程中获取Celery任务返回值

6 回复

这里是一个 handler 上面的一个 post 方法


在Tornado协程中获取Celery任务返回值,核心是使用tornado-celery库或手动将Celery的异步结果转换为Tornado的Future。这里给你一个直接可用的方案:

import tornado.ioloop
import tornado.web
from celery import Celery
from tornado import gen
from tornado.concurrent import Future

# 1. 配置Celery
celery_app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@celery_app.task
def add(x, y):
    import time
    time.sleep(2)  # 模拟耗时操作
    return x + y

# 2. 手动适配器(关键部分)
def celery_async_to_tornado_future(task_result):
    """将Celery的AsyncResult转换为Tornado的Future"""
    future = Future()
    
    def check_result():
        if task_result.ready():
            try:
                future.set_result(task_result.get())
            except Exception as e:
                future.set_exception(e)
        else:
            # 未完成则继续轮询
            tornado.ioloop.IOLoop.current().call_later(0.1, check_result)
    
    tornado.ioloop.IOLoop.current().add_callback(check_result)
    return future

# 3. Tornado处理器
class MainHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        # 启动Celery任务
        async_result = add.delay(10, 20)
        
        # 转换为Tornado Future并等待
        future = celery_async_to_tornado_future(async_result)
        result = yield future
        
        self.write(f"计算结果: {result}")

# 4. 应用启动
def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    print("服务启动: http://localhost:8888")
    tornado.ioloop.IOLoop.current().start()

工作原理:

  1. add.delay() 提交任务到Celery,返回AsyncResult对象
  2. celery_async_to_tornado_future() 创建Tornado的Future,并轮询检查Celery任务状态
  3. 使用yield future在协程中等待结果,不阻塞事件循环

需要安装的包:

pip install tornado celery redis

运行前准备:

  • 启动Redis:redis-server
  • 启动Celery worker:celery -A your_module_name.celery_app worker --loglevel=info

这样就能在Tornado协程中无缝获取Celery任务结果了。本质就是做个异步适配器。

总结建议:用Future适配器桥接两个异步系统。

尝试过在 celery 的 apply_async 添加 callback=self.on_post_result,但是也无法运行收尾的 self.on_post_result

yield future

yield remote_call_result 这里应该没有用吧?因为 celery 的 Result 类是需要用 get 或者 wait 来获取返回值的结果

用 future,内部轮询获取结果

回到顶部