Python3中如何结合使用async与Tornado、aioredis和Celery?

需求是这样,启动的时候 celery 订阅一个 channel 用于接收信号 ,需要在 tornado 的触发某个路由之后,然后使用 aioredis 进行订发布,修改每个 celery 里面的进程的某个变量。 问题:

  1. 在 python3 下面,aioredis 与 Tornado 的结合使用会有一定的问题,Tornado 出现假死,不接收请求,如何解决?
  2. celery 有办法能运行 aioredis 来进行订阅吗?因为我想是如果没有收到 Redis 的特定 channel 发布时,他会照这正常的 celery 一样,接收对应的请求

Python3中如何结合使用async与Tornado、aioredis和Celery?

5 回复

py 做复杂异步程序是束手束脚了。


核心思路:Tornado处理HTTP请求,aioredis在异步环境中操作Redis,Celery处理耗时同步任务。三者结合的关键是异步事件循环的协调

具体方案

  1. Tornado + aioredis:在Tornado的异步请求处理器中直接使用aioredis。
  2. Tornado + Celery:Celery是同步的,不能直接await。需要通过run_in_executor将Celery任务提交给线程池执行,避免阻塞Tornado的主事件循环。

示例代码

import asyncio
import tornado.ioloop
import tornado.web
import aioredis
from celery import Celery
from concurrent.futures import ThreadPoolExecutor

# --- 1. 配置Celery ---
celery_app = Celery('myapp', broker='redis://localhost:6379/0')

@celery_app.task
def my_slow_sync_task(data):
    # 模拟一个耗时的同步操作
    import time
    time.sleep(5)
    return f"Processed: {data}"

# --- 2. 创建线程池执行器(用于运行Celery任务)---
thread_pool = ThreadPoolExecutor(max_workers=4)

# --- 3. Tornado异步处理器 ---
class MainHandler(tornado.web.RequestHandler):
    async def get(self):
        # 示例1: 异步使用aioredis
        redis = await aioredis.from_url("redis://localhost:6379", decode_responses=True)
        await redis.set("my_key", "async_value")
        cached_value = await redis.get("my_key")

        # 示例2: 结合Celery(通过线程池)
        # 将Celery的delay().get()这个同步调用放到线程池中运行
        celery_result = await asyncio.get_event_loop().run_in_executor(
            thread_pool,
            lambda: my_slow_sync_task.delay("some_data").get(timeout=10)  # 同步阻塞调用
        )

        self.write(f"Redis got: {cached_value}, Celery result: {celery_result}")
        await redis.close()

# --- 4. 启动应用 ---
def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    print("Server starting on port 8888...")
    tornado.ioloop.IOLoop.current().start()

关键点解释

  • aioredis 是异步库,可以直接在 async def 函数中使用 await
  • Celery任务 (my_slow_sync_task.delay().get()) 是同步的。使用 asyncio.get_event_loop().run_in_executor 将其放入单独的线程中执行,这样就不会阻塞Tornado的主事件循环。
  • 线程池 (ThreadPoolExecutor) 的大小需要根据实际任务数量调整。

一句话总结:Tornado和aioredis原生协作,Celery任务用线程池封装避免阻塞。

请问你是 aioredis 的 pool 假死吗?

不是 pool 假死,是加载完路由之后,再请求就假死了

我这边用 tornado+peewee_async(aiopg) 也出现过假死 不知道什么原因

回到顶部