Python中并发爬虫时把返回结果的处理放到消息队列中可行吗?

简单来说是我在想能不能把我想要下载的网站 url1, url2, url3 … 等用 Requests 库发送请求后作为生产者放到消息队列里面,当这特定的 urlN 有返回包时消费者就从消息队列里面读取网站内容。

不知道技术上靠谱么?
Python中并发爬虫时把返回结果的处理放到消息队列中可行吗?

21 回复

听起来你打算用一个很慢的队列技术来实现异步非阻塞框架的功能


当然可行,而且这是处理并发爬虫结果的一种非常经典和高效的设计模式。

简单来说,就是把“数据抓取”和“数据处理”这两个阶段解耦。爬虫线程/进程只负责拼命发请求、收响应,然后把原始的响应体(比如HTML、JSON)或者初步提取的URL,作为一个“消息”快速扔到消息队列(比如RabbitMQ、Redis、Kafka)里。然后,另一组消费者进程再从队列里取出这些消息,进行解析、清洗、存储等可能比较耗时的操作。

这么做有几个明显的好处:

  1. 缓冲与削峰:爬虫的抓取速度可能忽快忽慢,下游处理可能遇到数据库慢、解析复杂等情况。队列作为一个缓冲区,防止数据丢失或系统被压垮。
  2. 解耦与扩展:你可以独立地扩展爬虫节点或消费者节点。比如,加机器专门抓取,或者加机器专门做数据清洗,两边互不影响。
  3. 提高爬虫效率:爬虫线程不用等待数据处理完成,发完消息就能立刻进行下一个请求,IO利用率更高。
  4. 容错性:如果数据处理服务挂了,消息会积压在队列里,等服务恢复后继续处理,数据不会丢。

下面是一个使用 concurrent.futures 进行并发抓取,并结合 redis 作为简单消息队列的示例。这里用 Redis 的 lpush/rpop 模拟队列。

首先,确保安装 redisrequests

pip install redis requests
import concurrent.futures
import requests
import redis
import json
from urllib.parse import urljoin
import time

# 初始化Redis连接 (作为消息队列)
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
QUEUE_NAME = 'crawler_results'

# 模拟要抓取的URL列表
start_urls = [
    'https://httpbin.org/html',
    'https://httpbin.org/json',
    'https://httpbin.org/xml',
    # ... 更多URL
]

def fetch_url(url):
    """爬虫工作者:只负责抓取,并把结果放入消息队列"""
    try:
        print(f"[Fetcher] 正在抓取: {url}")
        resp = requests.get(url, timeout=10)
        resp.raise_for_status()

        # 构造一个消息,包含原始内容和一些元数据
        message = {
            'url': url,
            'status_code': resp.status_code,
            'content': resp.text,  # 对于生产环境,可能需要考虑内容大小
            'fetched_at': time.time()
        }
        # 将消息序列化为JSON字符串后,推入Redis队列的右侧
        redis_client.rpush(QUEUE_NAME, json.dumps(message))
        print(f"[Fetcher] 已推送结果: {url}")

    except Exception as e:
        print(f"[Fetcher] 抓取 {url} 时出错: {e}")
        # 你也可以选择将错误信息放入另一个队列供监控处理

def process_worker(worker_id):
    """消费者工作者:从队列中取出结果并处理"""
    print(f"[Worker-{worker_id}] 启动")
    while True:
        # 从队列左侧阻塞弹出消息,超时时间5秒
        # 使用 `blpop` 可以避免忙等待,更高效
        _, message_json = redis_client.blpop(QUEUE_NAME, timeout=5)
        if message_json is None:
            # 超时,可以视为队列暂时为空,可以休息或退出
            print(f"[Worker-{worker_id}] 队列空,等待...")
            time.sleep(2)
            # 这里为了演示,我们简单退出。实际应用中可能有更复杂的停止逻辑。
            # break
            continue

        message = json.loads(message_json)
        url = message['url']
        content = message['content']

        print(f"[Worker-{worker_id}] 开始处理: {url}")

        # 模拟耗时的数据处理,例如解析HTML、提取数据、存入数据库等
        try:
            # 这里简单打印内容长度和类型
            content_type = 'HTML' if '<html>' in content.lower() else 'JSON' if content.strip().startswith('{') else 'Other'
            print(f"[Worker-{worker_id}] 处理完成: {url} -> 类型: {content_type}, 长度: {len(content)}")
            # time.sleep(0.5) # 模拟处理耗时
        except Exception as e:
            print(f"[Worker-{worker_id}] 处理 {url} 时出错: {e}")

if __name__ == '__main__':
    # 第1步:启动消费者进程 (这里用线程模拟)
    # 在实际项目中,消费者通常是独立的进程或服务,甚至在不同的机器上。
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        # 提交两个消费者任务
        future_to_worker = {executor.submit(process_worker, i): i for i in range(2)}

        # 第2步:使用线程池并发抓取
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as fetcher_executor:
            # 提交所有抓取任务
            fetch_futures = [fetcher_executor.submit(fetch_url, url) for url in start_urls]

            # 等待所有抓取任务完成 (非必须,取决于你的设计)
            for future in concurrent.futures.as_completed(fetch_futures):
                # 这里可以检查抓取任务是否有异常
                pass

        print("[Main] 所有抓取任务已提交。消费者将继续处理队列中的消息...")
        # 主线程可以在这里等待,或者做其他事情。
        # 由于消费者是守护线程(默认),主线程结束它们也会结束。
        # 这里我们让主线程等待一段时间,观察消费者处理。
        time.sleep(10)
        print("[Main] 演示结束。在实际应用中,消费者应作为独立服务持续运行。")

总结一下:这个架构很靠谱,特别适合大规模、需要稳定性和扩展性的爬虫项目。对于小项目,可能有点“杀鸡用牛刀”,但理解这个模式对设计健壮的系统很有帮助。

一句话建议:用消息队列解耦抓取和处理,是构建健壮并发爬虫的常用方案。

这就是分布式的雏形。。。。

这没有什么问题,简单点用 redis 更好点有 Kafka 然后开多进程去前面队列读取数据进行处理

可以做得到,技术上没什么问题,比如 tornado 框架、asyncio 就可以做到

我想了解 如何 拿到特定的返回包

还真试过放到 redis 里面的 SET 集合里。
我用 Go 做过一个搜索引擎,当时就是把爬回来的结果发给消费者,消费者处理后把结果放到 Redis 的 SET 里面,同时还能去重,但是有个问题,消费者处理到的 url 越来越多,然后 Redis 存放的体积越来越大。。。

scrapy-redis python 安装这个就好了

他的意思是要把网络 io 变成非阻塞的模式

socket 可 read 信号

可以啊,这不是很常用方案。要对数据做清洗处理才能入库。

请教一下,这个 socket 要监听 什么参数,能拿到特定的包返回
还有就是 这个过程会有 包丢失吗?

没问题啊 我现在就这么做的 更奢侈的是 我回吧 request 返回结果 直接存到 oss 里 然后把 key 丢到队列里

不过 request 可以直接发异步请求啊 加一个回调 就可以在收到 response 时处理

可以分享一下你的消息队列技术细节么?

linux 系统中把很多功能抽象成文件,比如 socket 也是。简单说就是,对方发送回响应到达本机网卡后,内核会找到等待处理的进程,通知它可以读取数据了。

你去看一下 pyspider 的源码,它就是这么干的。
调度器,下载器,结果处理器通过消息队列沟通,可以在一台机器上跑调度器,另外几台机器跑多个下载器和处理器,实现分布式。

基本跟楼上说的一样 要抓取的连接和抓取的内容都通过消息队列交互 如果是你的需求可以直接用一个异步的 request 工具

技术上没什么问题,但是脱离业务场景谈架构是无意义的。
队列最根本的目的还是解耦,你这里说将队列放置在请求之后,而调度与请求还是一个高耦合的状态。
另外,我引入一个业务场景,假设 url ( url1, url2, …)有抓取优先级,那么你这个设计如何满足需求?

回到顶部