Python中如何实现asyncio与redis队列的协同工作?

问题描述

我有一个进程会不断地往 redis 队列里增加数据,上限是 5000.然后我想在另一个进程中用 asyncio 从该队列里取出 item,里面有 url 然后请求 api 获得数据。请问我应该怎么控制我的爬取进程每一次只运行一定数目的协程(比如 20 )呢?具体应该怎么写呢?就是怎么从队列获得数据然后发起请求,同时又不能同时运行太多协程。


Python中如何实现asyncio与redis队列的协同工作?
4 回复

单线程定时取 redis,然后丢线程池运行请求 task ?


import asyncio
import aioredis
import json

class AsyncRedisQueue:
    def __init__(self, queue_name='task_queue', redis_url='redis://localhost'):
        self.queue_name = queue_name
        self.redis_url = redis_url
        self.redis = None
        
    async def connect(self):
        """建立Redis连接"""
        self.redis = await aioredis.from_url(self.redis_url)
        
    async def produce(self, data):
        """生产者:推送任务到队列"""
        if not self.redis:
            await self.connect()
            
        # 将数据序列化为JSON字符串
        message = json.dumps(data)
        
        # 使用LPUSH将任务推入列表左侧
        await self.redis.lpush(self.queue_name, message)
        print(f"生产任务: {data}")
        
    async def consume(self):
        """消费者:从队列获取任务"""
        if not self.redis:
            await self.connect()
            
        while True:
            # 使用BRPOP阻塞式获取任务(从右侧弹出)
            # timeout=0表示无限等待
            result = await self.redis.brpop(self.queue_name, timeout=0)
            
            if result:
                _, message = result
                data = json.loads(message)
                print(f"消费任务: {data}")
                
                # 模拟任务处理
                await self.process_task(data)
                
    async def process_task(self, data):
        """模拟异步任务处理"""
        await asyncio.sleep(1)  # 模拟耗时操作
        print(f"处理完成: {data}")
        
    async def close(self):
        """关闭Redis连接"""
        if self.redis:
            await self.redis.close()

async def main():
    queue = AsyncRedisQueue()
    
    # 启动消费者协程
    consumer_task = asyncio.create_task(queue.consume())
    
    # 生产一些任务
    tasks = [
        {"id": 1, "type": "email", "content": "Hello"},
        {"id": 2, "type": "sms", "content": "World"},
        {"id": 3, "type": "notification", "content": "Test"}
    ]
    
    for task in tasks:
        await queue.produce(task)
        await asyncio.sleep(0.5)  # 控制生产速度
    
    # 等待消费者处理(实际应用中可能需要更优雅的退出机制)
    await asyncio.sleep(5)
    consumer_task.cancel()
    
    try:
        await consumer_task
    except asyncio.CancelledError:
        pass
        
    await queue.close()

if __name__ == "__main__":
    asyncio.run(main())

核心要点:

  1. 使用aioredis:这是支持asyncio的Redis客户端,确保非阻塞I/O
  2. 队列选择:Redis List作为队列,LPUSH/BRPOP实现生产者-消费者模式
  3. 异步连接:在connect()方法中异步建立连接
  4. 阻塞消费brpop在队列为空时挂起协程,不浪费CPU资源
  5. 任务序列化:使用JSON序列化复杂数据结构

依赖安装:

pip install aioredis

运行示例: 代码会先推送3个任务到Redis队列,然后消费者异步处理这些任务。每个任务处理间隔1秒,可以看到异步并发的效果。

一句话建议: 用aioredis配合asyncio.create_task实现非阻塞的队列处理。

用 asyncio.Semaphore 控制协程数

第一:需要异步库确保 IO 是异步的 https://github.com/aio-libs/aioredis
第二控制,协程数据和其它多进程 /线程,调度方式类似。一种是提前创建固定数量协程,不断去读数据。 另外就是两个数据建个调度个协程,.create_task/ensure_future. 达到上限就停止,或者用 Semaphore

回到顶部