Python3中如何结合使用async与Tornado、aioredis和Celery?
需求是这样,启动的时候 celery 订阅一个 channel 用于接收信号 ,需要在 tornado 的触发某个路由之后,然后使用 aioredis 进行订发布,修改每个 celery 里面的进程的某个变量。 问题:
- 在 python3 下面,aioredis 与 Tornado 的结合使用会有一定的问题,Tornado 出现假死,不接收请求,如何解决?
- celery 有办法能运行 aioredis 来进行订阅吗?因为我想是如果没有收到 Redis 的特定 channel 发布时,他会照这正常的 celery 一样,接收对应的请求
Python3中如何结合使用async与Tornado、aioredis和Celery?
5 回复
py 做复杂异步程序是束手束脚了。
核心思路:Tornado处理HTTP请求,aioredis在异步环境中操作Redis,Celery处理耗时同步任务。三者结合的关键是异步事件循环的协调。
具体方案:
- Tornado + aioredis:在Tornado的异步请求处理器中直接使用aioredis。
- Tornado + Celery:Celery是同步的,不能直接
await。需要通过run_in_executor将Celery任务提交给线程池执行,避免阻塞Tornado的主事件循环。
示例代码:
import asyncio
import tornado.ioloop
import tornado.web
import aioredis
from celery import Celery
from concurrent.futures import ThreadPoolExecutor
# --- 1. 配置Celery ---
celery_app = Celery('myapp', broker='redis://localhost:6379/0')
@celery_app.task
def my_slow_sync_task(data):
# 模拟一个耗时的同步操作
import time
time.sleep(5)
return f"Processed: {data}"
# --- 2. 创建线程池执行器(用于运行Celery任务)---
thread_pool = ThreadPoolExecutor(max_workers=4)
# --- 3. Tornado异步处理器 ---
class MainHandler(tornado.web.RequestHandler):
async def get(self):
# 示例1: 异步使用aioredis
redis = await aioredis.from_url("redis://localhost:6379", decode_responses=True)
await redis.set("my_key", "async_value")
cached_value = await redis.get("my_key")
# 示例2: 结合Celery(通过线程池)
# 将Celery的delay().get()这个同步调用放到线程池中运行
celery_result = await asyncio.get_event_loop().run_in_executor(
thread_pool,
lambda: my_slow_sync_task.delay("some_data").get(timeout=10) # 同步阻塞调用
)
self.write(f"Redis got: {cached_value}, Celery result: {celery_result}")
await redis.close()
# --- 4. 启动应用 ---
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
print("Server starting on port 8888...")
tornado.ioloop.IOLoop.current().start()
关键点解释:
aioredis是异步库,可以直接在async def函数中使用await。- Celery任务 (
my_slow_sync_task.delay().get()) 是同步的。使用asyncio.get_event_loop().run_in_executor将其放入单独的线程中执行,这样就不会阻塞Tornado的主事件循环。 - 线程池 (
ThreadPoolExecutor) 的大小需要根据实际任务数量调整。
一句话总结:Tornado和aioredis原生协作,Celery任务用线程池封装避免阻塞。
请问你是 aioredis 的 pool 假死吗?
不是 pool 假死,是加载完路由之后,再请求就假死了
我这边用 tornado+peewee_async(aiopg) 也出现过假死 不知道什么原因

