Python异步多线程下载又拍云图片问题求助

代码

import asyncio
import base64
import os
import urllib

import aiohttp

-----------------------

-----------------------

bucket = ‘xxx’ username = ‘xxx’ password = ‘xxxxxx’ hostname = “xxxxxx” base_save_path = ‘f:’

-----------------------

headers = {} auth = base64.b64encode(f’{username}:{password}’.encode(encoding=‘utf-8’)) headers[‘Authorization’] = 'Basic ’ + str(auth) headers[‘User-Agent’] = “UPYUN_DOWNLOAD_SCRIPT” headers[‘x-list-limit’] = ‘300’

thread_sleep = 1

def is_dic(url): “”“判断 key 是否是目录 根据是否有后缀名判断”"" url = url.replace(‘http://v0.api.upyun.com/’, ‘’) # print(len(url.split(’.’))) if len(url.split(’.’)) == 1: return True else: return False

class Crawler: def init(self, init_key, hostname, max_tasks=10, pic_tsak=50): ‘’‘初始化爬虫’’’ self.loop = asyncio.get_event_loop() self.max_tries = 4 # 每个图片重试次数 self.max_tasks = max_tasks # 接口请求进程数 self.key_queue = asyncio.Queue(loop=self.loop) # 接口队列 self.pic_queue = asyncio.Queue(loop=self.loop) # 图片队列 self.session = aiohttp.ClientSession(loop=self.loop) #接口异步 http 请求 self.pic_session = aiohttp.ClientSession(loop=self.loop) #图片异步 http 请求 self.key_queue.put_nowait({‘key’: init_key, ‘x-list-iter’: None, ‘hostname’: hostname}) #初始化接口队列 push 需要下载的目录 self.pic_tsak = pic_tsak #图片下载进程数(接口有调用频率限制,http 下载没有限制)

def close(self):
    """回收 http session"""
    self.session.close()
    self.pic_session.close()

async def work(self):
    """接口请求队列消费者"""
    try:
        while True:
            url = await self.key_queue.get()
            # print('key 队列数量:' + await self.key_queue.qsize())
            await self.handle(url)
            self.key_queue.task_done()
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        pass

async def work_pic(self):
    """图片请求队列消费者"""
    try:
        while True:
            url = await self.pic_queue.get()
            await self.handle_pic(url)
            self.pic_queue.task_done()
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        pass

async def handle_pic(self, key):
    """处理图片请求"""
    url = (lambda x: x[0] == '/' and x or '/' + x)(key['key'])
    url = url.encode('utf-8')
    url = urllib.parse.quote(url)

    pic_url = key['hostname'] + url + '!s400'

    tries = 0
    while tries < self.max_tries:
        try:
            response = await self.pic_session.get(pic_url)
            break
        except aiohttp.ClientError:
            pass
        tries += 1
    try:
        if is_dic(url):
            # print('图片线程-目录 :{}'.format(url))
            content = await response.text()
            try:
                iter_header = response.headers.get('x-upyun-list-iter')
            except Exception as e:
                iter_header = 'g2gCZAAEbmV4dGQAA2VvZg'

            list_json_param = content + "`" + str(response.status) + "`" + str(iter_header)
            self.do_file(self.get_list(list_json_param), key['key'], key['hostname'])
        else:
            # print('图片线程-文件:{}'.format(key['save_path']))
            with open(key['save_path'], 'wb') as f:
                f.write(await response.read())
    finally:
        await response.release()

async def handle(self, key):

    """处理接口请求"""
    url = '/' + bucket + (lambda x: x[0] == '/' and x or '/' + x)(key['key'])
    url = url.encode('utf-8')
    url = urllib.parse.quote(url)

    if key['x-list-iter'] is not None:
        if key['x-list-iter'] is not None or not 'g2gCZAAEbmV4dGQAA2VvZg':
            headers['X-List-Iter'] = key['x-list-iter']

    tries = 0
    while tries < self.max_tries:
        try:
            response = await self.session.get("http://v0.api.upyun.com" + url, headers=headers)
            break
        except aiohttp.ClientError:
            pass
        tries += 1
    try:
        if is_dic(url):
            # print('目录线程-目录 :{}'.format(url))
            content = await response.text()
            try:
                iter_header = response.headers.get('x-upyun-list-iter')
            except Exception as e:
                iter_header = 'g2gCZAAEbmV4dGQAA2VvZg'

            list_json_param = content + "`" + str(response.status) + "`" + str(iter_header)
            self.do_file(self.get_list(list_json_param), key['key'], key['hostname'])
        else:
            # print('目录线程-文件:{}'.format(key['save_path']))
            with open(key['save_path'], 'wb') as f:
                f.write(await response.read())
    finally:
        await response.release()

def get_list(self, content):
    # print(content)
    if content:
        content = content.split("`")
        items = content[0].split('\n')
        content = [dict(zip(['name', 'type', 'size', 'time'], x.split('\t'))) for x in items] + content[1].split() + \
                  content[2].split()
        return content
    else:
        return None

def do_file(self, list_json, key, hostname):
    """处理接口数据"""
    for i in list_json[:-2]:
        if not i['name']:
            continue
        new_key = key + i['name'] if key == '/' else key + '/' + i['name']
        try:
            if i['type'] == 'F':
                self.key_queue.put_nowait({'key': new_key, 'x-list-iter': None, 'hostname': hostname})
            else:
                try:
                    if not os.path.exists(bucket + key):
                        os.makedirs(bucket + key)
                except OSError as e:
                    print('新建文件夹错误:' + str(e))
                save_path = base_save_path + '/' + bucket + new_key
                if not os.path.isfile(save_path):
                    print(f'请求图片:', new_key)
                    self.pic_queue.put_nowait(
                        {'key': new_key, 'save_path': save_path, 'x-list-iter': None, 'hostname': hostname})
                else:
                    print(f'文件已存在:{save_path}')
        except Exception as e:
            print('下载文件错误!:' + str(e))
            with open('download_err.txt', 'a') as f:
                f.write(new_key + '\n')
    if list_json[-1] != 'g2gCZAAEbmV4dGQAA2VvZg':
        self.key_queue.put_nowait({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname})
        # self.key_queue.put_nowait({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname})

async def run(self):
    """初始化任务进程"""
    workers = [asyncio.Task(self.work(), loop=self.loop)
               for _ in range(self.max_tasks)]

    workers_pic = [asyncio.Task(self.work_pic(), loop=self.loop)
                   for _ in range(self.pic_tsak)]

    await self.key_queue.join()
    await self.pic_queue.join()

    workers.append(workers_pic)
    for w in workers:
        w.cancel()

if name == ‘main’: loop = asyncio.get_event_loop() crawler = Crawler(’/’, hostname, max_tasks=5, pic_tsak=150) loop.run_until_complete(crawler.run())

crawler.close()

loop.close()

上面是代码

问题

  1. 以上代码执行后没什么问题。但是当长时间执行后会卡主。。百思不得其解(猜测可能是队列问题?但是无法验证)。为何到达一定时间(大约 5 小时以上)脚本会卡死?
  2. 此脚本目的为了下载又拍云所有图片保存到本地。图片量非常大(大约 10T) 3 亿张左右。目前机器的下载宽带大概在 300M/下载速度大约 30M/S ,多次联系又拍云,又拍云表示只能这样下载。无法通过邮寄硬盘直接拷贝。我们也在杭州。但是又拍云无法拷贝 还有什么特殊方法可以快速下载所有图片?

Python异步多线程下载又拍云图片问题求助

33 回复

我无法理解你的问题

昂 这么硬的问题木有人给解答下吗

难道是保存文件的时候引起的?

在线等。。。。急

更正:
python<br>def is_dic(url):<br> """判断 key 是否是目录 根据是否有后缀名判断"""<br> url = url.replace('<a target="_blank" href="http://v0.api.upyun.com/'" rel="nofollow noopener">http://v0.api.upyun.com/'</a>, '')<br> # print(len(url.split('.')))<br> if len(url.split('.')) &gt; 1:<br> return True<br> else:<br> return False<br>

擦。。。看错了 帖子主题的代码是正确的。。

脚本运行时,用 strace 看一下卡在哪里了

strace -p <pid>

windows 下有这个玩意么

脚本是跑在 windows 下的。。因为硬盘要用 ntfs 格式给某部门送过去

另外一个比较值得怀疑的一点是,你的所有文件操作没有关闭,有可能用尽 open files 限制。
可以对比一下这三个值:
用户限制:ulimit -n
进程限制:cat /proc/<pid>/limits
实际使用:ls /proc/<pid>/fd | wc -l

当我没说。。

cygwin / mingw 之类的可能有 strace

这里使用了 with 语句,应该能保证 with 语句执行完毕后已经关闭了打开的文件句柄。应该不是这个问题呀。

加个 timeout 试试,要等 tcp 的 timeout 机制触发要好久的

是在每次 http 请求的时候加的吗?

但是是异步的啊 应该会等待请求完成的啊

对于单个进程而言 会等待的啊

一个可能是池子里面 task 全塞死了,

http 请求引起的卡死吗?

我觉得好像找到问题了。当 asyncio 队列满了之后 会阻塞线程。但是我这里用的 put_nowait

http://python.usyiyi.cn/translate/python_352/library/asyncio-queue.html
put_nowait(item)
将项目放入队列而不阻塞。

如果没有可用的空位,引发 QueueFull。

又没有设置 queue 的 max_size,怎么会 QueueFull

一些细节的东西:

* 用一个 ClientSession 就好,或者多个 ClientSession 用同一个 TCPConnector
* session.get 要加 timeout,我以前遇到过卡死过在请求上
* response 可以用 async with 打开,可靠性和可读性都有提高
* Windows 下要确认是不是有很多文件没有关闭,可以用 OpenedFilesView

另外写文件使用的 io 阻塞操作

写文件使用 aiofiles 实现异步写操作

async with aiofiles.open(‘download_err.txt’, ‘a’) as f:
await f.write(new_key + ‘\n’)

跑跑看 有问题在此贴继续讨论

感谢 我都试试看 谢谢

按您说的几点
1. ClientSession 已改为一个
2. session.get timeout=60
3. response 使用 async with 打开

目前再跑 明日看看会不会还卡死

Windows 的话,我只知道 Visual Studio 能直接附加调试 Python 代码,不知道是啥黑科技。

在怀疑的代码快前后加两个变量计数,程序加个中断信号控制器,打印计数的变量

目前没卡死了 一台电脑因为队列太大内存爆了
目前没有卡死了 一直在运行

说下最终解决方案吧
1. 使用 mq 替代自带的队列,这样也方便断点续传,还有个原因是其中一个 bucket 图片大约有 7T 而我们购买的硬盘单个只有 4T 又不能做磁盘阵列,所以需要两个同时下而且不重复,于是乎就改造成了分布式的了
2. 拆分生产者和消费者,其中获得目录的服务既是消费者又是生产者,目前按照 24 楼仁兄的几个细节建议,改进了代码另外加了多处的异常判断和处理,目前非常健壮和稳定

速度:单机跑 100 个线程 两台机器一起跑,不加延时大约每秒 400 张,但是带宽会爆满,然后 mq 会断掉,所以加了延时,每个线程每次下载后加了 0.05 秒的延时后 下载速度大约每秒 300 到 360 张浮动,带宽占用百分之 80 到 95 浮动。目前很稳定。。

感谢各位

回到顶部