Python中使用aiohttp写爬虫,如何正确停止循环?

使用 aiohttp 试着写了一个爬虫,但是发现可能会出现 一级页面还在抓取的时候,由于队列为空,直接退出的情况。不知该如何去做这个判断?另外不知以下代码这么写是否有其他的问题??

# coding:utf-8

import asyncio

import aiohttp

class Task(object): def init(self, info, priority): self.priority = priority self.info = info

def __lt__(self, other):
    return self.priority < other.priority

class Spider(object):

def __init__(self, loop=None):
    self.loop = loop
    conn = aiohttp.TCPConnector(limit=3)
    self.session = aiohttp.ClientSession(loop=loop, connector=conn)
    self.queue = asyncio.PriorityQueue()

def start(self, page):
    task_info = {'callback': self.parse_1, 'page': page}
    return task_info

async def set_item(self, task):
    pass

async def fetch(self, task):
    await asyncio.sleep(2)
    task['callback'](task['page'])

async def worker(self):
    while True:
        next_task = await self.queue.get()
        if next_task.info.get('type') == 'item':
            asyncio.ensure_future(self.set_item(next_task.info))
        else:
            asyncio.ensure_future(self.fetch(next_task.info))
        self.queue.task_done()

        # if self.queue.empty():
        #     await asyncio.sleep(1)
        #     if self.queue.empty():
        #         break

def run(self):
    for page in range(1, 10):
        self.queue.put_nowait(Task(self.start(page), 0))

    self.loop.run_until_complete(self.worker())

def close(self):
    if not self.session.closed:
        if self.session._connector_owner:
            self.session._connector.close()
        self.session._connector = None

def parse_1(self, meta):
    print('parse_1-----', meta)
    for page in range(20, 30):
        task = {'callback': self.parse_2, 'page': page}
        self.queue.put_nowait(Task(task, 1))

def parse_2(self, meta):
    print('parse2----', meta)
    for page in range(30, 40):
        task = {'callback': self.parse_3, 'page': page}
        self.queue.put_nowait(Task(task, 0))

def parse_3(self, meta):
    print('parse3----', meta)

loop = asyncio.get_event_loop() sp = Spider(loop=loop) sp.run() sp.close()


Python中使用aiohttp写爬虫,如何正确停止循环?

17 回复

我看别人写的是先把队列放到 redis 里,对 redis 进行判断从而进行开始或终止


asyncio.CancelledError 配合 asyncio.gather 或者 asyncio.wait 来优雅停止。核心是拿到任务对象然后取消。

假设你的爬虫主循环在跑多个并发任务,比如这样:

import asyncio
import aiohttp
import signal

async def fetch(session, url):
    try:
        async with session.get(url) as response:
            return await response.text()
    except asyncio.CancelledError:
        # 清理资源,比如关闭响应
        print(f"Cancelled fetch for {url}")
        raise

async def worker(name, session, queue):
    while True:
        url = await queue.get()
        try:
            html = await fetch(session, url)
            print(f"{name} fetched {url[:50]}...")
        finally:
            queue.task_done()

async def main():
    queue = asyncio.Queue()
    # 往队列里塞点任务
    for url in ['http://example.com/page1', 'http://example.com/page2']:
        await queue.put(url)

    async with aiohttp.ClientSession() as session:
        # 创建多个worker任务
        tasks = []
        for i in range(3):
            task = asyncio.create_task(worker(f'Worker-{i}', session, queue))
            tasks.append(task)

        # 等队列清空,或者设置个停止条件
        await queue.join()

        # 停止所有worker任务
        for task in tasks:
            task.cancel()
        # 等待它们被取消完成
        await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == '__main__':
    asyncio.run(main())

更常见的做法是监听系统信号(比如Ctrl+C):

async def main_with_signal():
    queue = asyncio.Queue()
    # ... 初始化队列和session ...

    tasks = []
    async with aiohttp.ClientSession() as session:
        for i in range(3):
            task = asyncio.create_task(worker(f'Worker-{i}', session, queue))
            tasks.append(task)

        # 设置信号处理
        loop = asyncio.get_running_loop()
        stop_event = asyncio.Event()

        def signal_handler():
            print("\nShutting down...")
            stop_event.set()

        loop.add_signal_handler(signal.SIGINT, signal_handler)

        try:
            # 主循环,直到收到停止信号
            while not stop_event.is_set():
                # 这里可以加一些生产任务到队列的逻辑
                await asyncio.sleep(0.1)  # 防止空转
        finally:
            # 取消所有任务
            for task in tasks:
                task.cancel()
            await asyncio.gather(*tasks, return_exceptions=True)

# 运行这个版本
if __name__ == '__main__':
    asyncio.run(main_with_signal())

关键就两点:1. 在任务里捕获 asyncio.CancelledError 做清理;2. 在外部用 task.cancel() 触发取消,然后 gather 等它结束。

简单说就是:取消任务并妥善处理取消异常。

感觉用 redis 应该也会有这个问题。。。除非是标记一下任务状态

woker 函数里别 while True;弄一个标记,如果为 True 继续,为 False 就停止,

但爬取何时完成,无法预料,也就无法给设置为 False 啊

加个 work in progress 标记

手机看的好难受,没看完,你试试 work 那里使用 wait_for 替换 ensure_future

判断任务队列用 join 而不是 empty

请教一下这个标记的动作应该放在哪里执行??
替换后无法执行。。。

呃,判断队列为空不是 empty 么?

empty 是判断队列是否为空,join 是阻塞至所有任务完成,也就是调用 task_done。你用 empty 判断队列为空,只是所有的任务都被 get 了,不代表已经完成了。你可以看看官方的例子 https://docs.python.org/3/library/asyncio-queue.html

#8 任何 worker 没处理完之前都是 WIP 啊,除了要判断队列是否为空,还要判断是否有任务是 WIP

#9 谢回复!代码中是在 while 中从队列获取任务直接注册到事件循环,然后就执行了 task_done。这么如果用 join 去判断的话应该也是一样的,我尝试用链接中的例子去改写上面的爬虫代码,但是好像也是行不通,会一直阻塞。不知何故。

#10 谢回复!判断任务状态,在官方文档中找到了 all_tasks 和 current_task 这两个方法,但是好像不好使,即使任务全部完成也不为 None,导致判断失败。。

<br> async def worker(self):<br> """<br> 任务调度<br> :return:<br> """<br> while True:<br> if not self.queue.empty():<br> next_data = await self.queue.get()<br> task_id = uuid.uuid1()<br> self.task_running_list.append(task_id)<br> if isinstance(next_data, Item):<br> asyncio.create_task(self.set_item(task_id, <a target="_blank" href="http://next_data.info" rel="nofollow noopener">next_data.info</a>))<br> else:<br> asyncio.create_task(self.fetch(task_id, <a target="_blank" href="http://next_data.info" rel="nofollow noopener">next_data.info</a>))<br> self.queue.task_done()<br> else:<br> await asyncio.sleep(0)<br> if self.queue.empty() and not self.task_running_list:<br> break<br>
根据 的提示,这么处理可解决该问题。

#12 处理完要把 task_id 删掉啊

嗯嗯,删除操作是在 fetch 方法里面执行的

怎么贴的代码?

建议 aio+mq

回到顶部