Python asyncio 下载爬虫无法停止的问题如何解决

一个礼拜前粗略的学习了下 asyncio + aiohttp 实现异步爬虫,三天前为了联手写了一个 ins 下载爬虫。 爬虫思路草图:a.png用文字描述把: 首先我参考了 aiohttp 官方的爬虫例子,官方爬虫例子在这:crawl.py,我的思路是这样的

1.获取 user id ; user id 是必须的,所以我把这个写成了一个方法在创建类的时候直接调用

2.获取页的数据;因为没发说是第几页,这个请求需要三个参数,一个是 user id,一个是一次获取到的数量,第三个是 end_cursor 用来获取下一页

3.解析数据;获取到的数据是 json 格式的,我需要获取两个东西,第一个是图片链接,第二个是 end_cursor,用来获取下一页

4.处理 url ;这个方法遍历 urls 调用 download 方法下载

5.下载;用到了 aiohttp 和 aiofiles,没有异常、下载完后我用 asyncio.Task(self.get_display_urls(end_cursor))回到了获取页数据的方法,以此循环,当然获取页数据的方法有判读 end_cursor 是否为空,直接 loop.stop()

为了实现抓取所有图片,我没有使用 run_until_complete,因为它只获取了一次就停了,我就是用的 run_forever

全部代码如下:

import aiohttp
import asyncio
import aiofiles
import aioredis
import re
import json
import os
import signal
import time
import logging

class Instagram(object):

def __init__(self, username, loop, depth=0, maxtasks=200):
    """
    :param username: 用户名
    :param loop:
    :param depth: 下载页面数量
    :param maxtasks: 最大并发限制
    """
    self.down_tasks = set()
    self.down_todo = set()
    self.down_busy = set()
    self.down_done = {}

    self.loop = loop
    self.sem = asyncio.Semaphore(maxtasks, loop=loop)

    self.username = username
    self.max_page = depth if depth >= 1 else -1

    self.ROOT_URL = 'https://www.instagram.com/'
    self.headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36',
        'Cookie': 'rur=ATN; mid=XM2K8QAEAAGy8fiEf1b2T05Pssas; fbm_124024574287414=base_domain=.instagram.com; fbsr_124024574287414=ns7o0TqnERhbPihnN390KYuDdI7xVM2vgUunMZT4URY.eyJjb2RlIjoiQVFESlVpaVhaSFNwWnBTZ2VGUE1nUGlfUXlsdElpRG9vOHJDdHB3Qm14Q25rNUx6YnJsNHdBX1JRVnowaDREU3J4ZzFGTWVHWHdlWFlhVGxuVi0yMk84ZXdlUVBNWTg5bVF6MFg5RG40b3psSEozTGk4WW40N1lPeFQzdE0yQUNJWkg5SWh1VmhpRHBoaXZ4ZXNMM3dhc2hMcHdQQ2RkSDZWR2FQMlR1QVM4V3U1SElGTERWaEpfYzl3akstem94TFl3QWRESE9wSjNwcDlhTjVhcXFBWGlWM0lfNTducGZ0cmpCWlFLd2xUZzlYZjBEbUlFdmR5RTBsMng3OEY0RkJ6Q1NtNWEzQ2RISTRYckVqNXB6LWVrYjRyNHRza05HOUhHUmZSaXAwS0hya1VqQ3l4T3YwNDBEU2txOHI4MGJvZG9GU3o4THFHelpSckZ4dldVMjNUWGhkZ2d6MTEzbHNfVnN5T1V5X01EUHZlSHVtUkQ5bXJ1V01ObGUxOFBuV2hvIiwidXNlcl9pZCI6IjEwMDAyNDA3NTU3MTE2NyIsImFsZ29yaXRobSI6IkhNQUMtU0hBMjU2IiwiaXNzdWVkX2F0IjoxNTU3MDQ0NTE0fQ; csrftoken=2JzdvnHL9iMuxbV7KiJcASk8RlKuYWAQ; shbid=2545; shbts=1557044558.2494695; ds_user_id=5561946202; sessionid=5561946202%3AwE5Vb00lI1bmIb%3A23; urlgen="{"2001:19f0:7001:1e1d:5400:1ff:fef7:67fd": 20473}:1hND0O:dQodCbp0SM_24vfenOyhBT-Curk"'
    }
    self.proxy = "http://localhost:8001"
    t = asyncio.ensure_future(self.init(), loop=loop)
    loop.run_until_complete(t)

async def run(self):
    """
    :return:
    """
    await self.init()
    t = asyncio.ensure_future(self.addurls(), loop=self.loop)
    while self.down_busy:
        await asyncio.sleep(1, loop=self.loop)
    await t
    self.loop.close()

async def init(self):
    """
    初始化必要参数:user id
    :return:
    """
    print('[init] 初始化参数...')
    shared_data = await self.get_shared_data()
    if not shared_data:
        print('!!!!!!!')
        exit(0)
    self.user_id = re.findall('"logging_page_id":.?"profilePage_(.*?)"', shared_data)[0]

async def _http_request(self, url, **kwargs):
    """
    http 请求
    :param url: 请求链接
    :param kwargs: 链接参数
    :return: 网页 response
    """
    params = dict()
    if kwargs:
        for k, v in kwargs.items():
            params.update(v)
    async with self.sem:
        async with aiohttp.ClientSession() as session:
            try:
                async with session.get(url, timeout=10, proxy=self.proxy, headers=self.headers,
                                            params=params) as response:
                    html = (await response.read()).decode('utf-8', 'replace')
                    return html
            except Exception as exc:
                logging.warning("[_http_request] 异常: {}".format(exc))

async def get_shared_data(self):
    """
    获取 shared data
    :return:
    """
    html = await self._http_request(self.ROOT_URL + self.username)
    if html:
        shared_data = html.split("window._sharedData = ")[1].split(";</script>")[0]
        return shared_data

def get_ends_cursor(self, html):
    """

    :param html:
    :return:
    """
    if html:
        edge_media = json.loads(html)['data']['user']['edge_owner_to_timeline_media']
        edges = edge_media['edges']
        if edges:
            end_cursor = edge_media['page_info']['end_cursor']
            has_next_page = edge_media['page_info']['has_next_page']
            if has_next_page:
                return end_cursor
            return ''

async def get_display_url(self, max=50, end_cursor=""):
    """
    解析 display url
    :param max: 单次获取图片总量
    :param end_cursor: end_cursor 是获取下一页的参数
    :return: 包含{max}数量的图片链接列表
    """
    pic_params = {
        'query_hash': 'f2405b236d85e8296cf30347c9f08c2a',
        'variables': '{{"id":"{0}","first":{1},"after":"{2}"}}'.format(self.user_id, max, end_cursor),
    }
    pic_url = self.ROOT_URL + 'graphql/query/'
    html = await self._http_request(pic_url, parms=pic_params)
    if html:
        edge_media = json.loads(html)['data']['user']['edge_owner_to_timeline_media']
        edges = edge_media['edges']
        if edges:
            display_urls = []
            for edge in edges:
                display_urls.append(edge['node']['display_url'])
            return display_urls, self.get_ends_cursor(html)

async def download(self, url):
    """
    下载到本地
    :param url:
    :return:
    """
    print('processing:', url)
    # try:
        # async with self.sem: //如果使用 Semaphore 会卡住。。。虽然不会报错
    self.down_todo.remove(url)
    self.down_busy.add(url)
    path = './instagram/' + self.username
    if not os.path.exists(path):
        os.makedirs(path)

    filename = path + '/' + url.split('?')[0].split('/')[-1]
    print('start download:', url)
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url, headers=self.headers, proxy=self.proxy) as resp:
                if resp.status == 200:
                    f = await aiofiles.open(filename, 'wb')
                    await f.write(await resp.read())
                    await f.close()
                    await asyncio.Task(self.addurls(self.end_cursor))
                resp.close()
                self.down_done[url] = True
        except Exception as exc:
            logging.error('[download]下载异常,异常:{}\n 链接:{}'.format(repr(exc), url))
            self.down_done[url] = False

    self.down_busy.remove(url)
    print(len(self.down_done), 'completed tasks,', len(self.down_tasks),
          'still pending, todo', len(self.down_todo))
    # 这个判断根本没有任何用,不会调用,直接卡住
    if self.end_cursor is False:
        print('下载完 la')
        self.loop.close()

    # except Exception as exc:
    #     logging.error('[download]下载异常,异常:{}\n 链接:{}'.format(repr(exc), url))

async def add_down_urls(self, urls):
    print('[add_down_urls] 开始下载,数量:', len(urls))
    async with asyncio.Semaphore()
    for url in urls:
        self.down_todo.add(url)
        await self.sem.acquire()
        task = asyncio.ensure_future(self.download(url), loop=self.loop)
        task.add_done_callback(lambda t: self.sem.release())
        task.add_done_callback(self.down_tasks.remove)
        self.down_tasks.add(task)

async def addurls(self, end_cursor=""):
    """
    :param end_cursor: 当前页面的标示 base64 加密,用于加载下一页,如果没有下一页改参数为 Fasle
    :return:
    """
    print("\n\n 开始获取下一页,end_cursor:", end_cursor)

    display_urls, self.end_cursor = await self.get_display_url(end_cursor=end_cursor)
    await self.add_down_urls(display_urls)
    if not self.end_cursor:
        return

‘’’ 流程: run() --> addurls() --> add_own_urls() --> download() ^ | | | <-------<-----<--------<-------<– ‘’’

if name == ‘main’: start = time.time() loop = asyncio.get_event_loop() ins = Instagram(‘taeri__taeri’, loop) future = asyncio.ensure_future(ins.addurls(), loop=loop) try: loop.add_signal_handler(signal.SIGINT, loop.stop) except RuntimeError: pass loop.run_forever() # loop.run_until_complete(future) # for i in future.result(): # print(">>>>", i) # ins.main() end = time.time() print(‘耗时:’, end - start)

我遇到的问题是不使用 Semaphore 的情况下一开始是疯狂下载,也的确是下载成功了,然后就直接卡住,也不停,就一直卡住(原谅我使用卡住这个词),希望能帮忙看一下错在哪,谢谢了


Python asyncio 下载爬虫无法停止的问题如何解决

11 回复

草图在这。。。为了找画图工具用了半个多小时 https://github.com/ZCKun/d/blob/master/a.png


这个问题我遇到过。核心原因是事件循环里还有未完成的协程任务。直接ctrl+c或者调用loop.stop()可能不够,需要优雅地关闭。

关键是要捕获信号,然后取消所有任务。这里有个完整的解决方案:

import asyncio
import signal
import aiohttp
from typing import List

class Downloader:
    def __init__(self):
        self.tasks: List[asyncio.Task] = []
        self.shutdown_flag = False
        
    async def download(self, url: str):
        """下载单个URL"""
        if self.shutdown_flag:
            return
            
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    content = await response.read()
                    print(f"Downloaded {url}, size: {len(content)} bytes")
                    await asyncio.sleep(0.1)  # 模拟处理时间
        except asyncio.CancelledError:
            print(f"Cancelled download: {url}")
            raise
        except Exception as e:
            print(f"Error downloading {url}: {e}")
    
    async def worker(self, urls: List[str]):
        """工作协程,处理多个URL"""
        for url in urls:
            if self.shutdown_flag:
                break
            task = asyncio.create_task(self.download(url))
            self.tasks.append(task)
        
        # 等待所有任务完成或取消
        done, pending = await asyncio.wait(self.tasks, return_when=asyncio.ALL_COMPLETED)
        
    def shutdown(self):
        """触发关闭"""
        self.shutdown_flag = True
        for task in self.tasks:
            if not task.done():
                task.cancel()

async def main():
    # 示例URL列表
    urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 6)] * 2
    
    downloader = Downloader()
    
    # 设置信号处理
    loop = asyncio.get_running_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, downloader.shutdown)
    
    try:
        await downloader.worker(urls)
    except asyncio.CancelledError:
        print("Main task cancelled")
    finally:
        # 清理资源
        if downloader.tasks:
            await asyncio.gather(*downloader.tasks, return_exceptions=True)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nProgram terminated by user")

这个方案的关键点:

  1. shutdown_flag标记关闭状态,避免启动新任务
  2. 捕获信号后取消所有运行中的任务
  3. download方法里处理CancelledError,让任务能正常退出
  4. 最后用asyncio.gather(return_exceptions=True)等待所有任务结束,避免抛出未处理异常

如果用的是Python 3.11+,还可以用asyncio.timeout()设置超时,或者用asyncio.run(main(), shutdown_timeout=10)设置全局关闭超时。

简单说就是:信号处理 + 任务取消 + 异常处理。

ins 是新号,所以 cookie 没去掉就算了

不能重新编辑么。。代码有些错误忘了删除,希望别介意

你好,研究了下你的代码,发现一个小问题

# 这个判断根本没有任何用,不会调用,直接卡住
if self.end_cursor is False:

这个是因为之前没有下一页的时候 end_cursor 是 ‘’ 空字符串,而不是 False。


另外就是我觉得你的架构上有问题,这个问题是经典的生产者-消费者模型,请求并解析出图片链接作为生产者,然后启动多个消费者来下载这些链接就行了。我重构了一下你的代码,具体可见 https://gist.github.com/cshuaimin/4cf8d769b88e93fc805ceefb9af8c1f4

还有就是可以看到在 _http_request 方法里为每一个请求都生成了一个 ClientSession,这样太浪费了,建议只用一个 session。doc:

Session encapsulates a connection pool (connector instance) and supports keepalives by default. Unless you are connecting to a large, unknown number of different servers over the lifetime of your application, it is suggested you use a single session for the lifetime of your application to benefit from connection pooling.

https://docs.aiohttp.org/en/stable/client_reference.html

之前 get_end_cursor 方法获取不到的话是返回 False 的,,忘了改了

谢谢你,我看一下

GvR 有一个 500lines 项目你可以参考一下。另外楼上说的很对,这是典型的生产者消费者模式,另外似乎没有考虑去重、重定向和超时重试?

知道了,以前学 java 的时候学过,但是已经忘得差不多了,我去复习,谢谢

回到顶部