Python中如何实现asyncio与redis队列的协同工作?
问题描述
我有一个进程会不断地往 redis 队列里增加数据,上限是 5000.然后我想在另一个进程中用 asyncio 从该队列里取出 item,里面有 url 然后请求 api 获得数据。请问我应该怎么控制我的爬取进程每一次只运行一定数目的协程(比如 20 )呢?具体应该怎么写呢?就是怎么从队列获得数据然后发起请求,同时又不能同时运行太多协程。
Python中如何实现asyncio与redis队列的协同工作?
4 回复
单线程定时取 redis,然后丢线程池运行请求 task ?
import asyncio
import aioredis
import json
class AsyncRedisQueue:
def __init__(self, queue_name='task_queue', redis_url='redis://localhost'):
self.queue_name = queue_name
self.redis_url = redis_url
self.redis = None
async def connect(self):
"""建立Redis连接"""
self.redis = await aioredis.from_url(self.redis_url)
async def produce(self, data):
"""生产者:推送任务到队列"""
if not self.redis:
await self.connect()
# 将数据序列化为JSON字符串
message = json.dumps(data)
# 使用LPUSH将任务推入列表左侧
await self.redis.lpush(self.queue_name, message)
print(f"生产任务: {data}")
async def consume(self):
"""消费者:从队列获取任务"""
if not self.redis:
await self.connect()
while True:
# 使用BRPOP阻塞式获取任务(从右侧弹出)
# timeout=0表示无限等待
result = await self.redis.brpop(self.queue_name, timeout=0)
if result:
_, message = result
data = json.loads(message)
print(f"消费任务: {data}")
# 模拟任务处理
await self.process_task(data)
async def process_task(self, data):
"""模拟异步任务处理"""
await asyncio.sleep(1) # 模拟耗时操作
print(f"处理完成: {data}")
async def close(self):
"""关闭Redis连接"""
if self.redis:
await self.redis.close()
async def main():
queue = AsyncRedisQueue()
# 启动消费者协程
consumer_task = asyncio.create_task(queue.consume())
# 生产一些任务
tasks = [
{"id": 1, "type": "email", "content": "Hello"},
{"id": 2, "type": "sms", "content": "World"},
{"id": 3, "type": "notification", "content": "Test"}
]
for task in tasks:
await queue.produce(task)
await asyncio.sleep(0.5) # 控制生产速度
# 等待消费者处理(实际应用中可能需要更优雅的退出机制)
await asyncio.sleep(5)
consumer_task.cancel()
try:
await consumer_task
except asyncio.CancelledError:
pass
await queue.close()
if __name__ == "__main__":
asyncio.run(main())
核心要点:
- 使用aioredis:这是支持asyncio的Redis客户端,确保非阻塞I/O
- 队列选择:Redis List作为队列,LPUSH/BRPOP实现生产者-消费者模式
- 异步连接:在
connect()方法中异步建立连接 - 阻塞消费:
brpop在队列为空时挂起协程,不浪费CPU资源 - 任务序列化:使用JSON序列化复杂数据结构
依赖安装:
pip install aioredis
运行示例: 代码会先推送3个任务到Redis队列,然后消费者异步处理这些任务。每个任务处理间隔1秒,可以看到异步并发的效果。
一句话建议: 用aioredis配合asyncio.create_task实现非阻塞的队列处理。
用 asyncio.Semaphore 控制协程数
第一:需要异步库确保 IO 是异步的 https://github.com/aio-libs/aioredis
第二控制,协程数据和其它多进程 /线程,调度方式类似。一种是提前创建固定数量协程,不断去读数据。 另外就是两个数据建个调度个协程,.create_task/ensure_future. 达到上限就停止,或者用 Semaphore

