Python中使用async/await下载文件失败,如何解决?

目前需要从一个 FTP 服务器下载 3 万多个小文件,之前用 multiprocessing 总是下一部分之后就停了,所以尝试用异步加快下载:

class pdb:
    def __init__(self):
        self.ids = []
        self.dl_id = []
        self.err_id = []
async def download_file(self, session, url):
    try:
        with async_timeout.timeout(10):
            async with session.get(url) as remotefile:
                if remotefile.status == 200:
                    data = await remotefile.read()
                    return {"error": "", "data": data}
                else:
                    return {"error": remotefile.status, "data": ""}
    except Exception as e:
        return {"error": e, "data": ""}

async def unzip(self, session, work_queue):
    while not work_queue.empty():
        queue_url = await work_queue.get()
        print(queue_url)
        data = await self.download_file(session, queue_url)
        id = queue_url[-11:-7]
        ID = id.upper()
        if not data["error"]:
            saved_pdb = os.path.join("./pdb", ID, f'{ID}.pdb')
            if ID not in self.dl_id:
                self.dl_id.append(ID)
            with open(f"{id}.ent.gz", 'wb') as f:
                f.write(data["data"].read())
            with gzip.open(f"{id}.ent.gz", "rb") as inFile, open(saved_pdb, "wb") as outFile:
                shutil.copyfileobj(inFile, outFile)
            os.remove(f"{id}.ent.gz")
        else:
            self.err_id.append(ID)

def download_queue(self, urls):
    loop = asyncio.get_event_loop()
    q = asyncio.Queue(loop=loop)
    [q.put_nowait(url) for url in urls]
    con = aiohttp.TCPConnector(limit=10)
    with aiohttp.ClientSession(loop=loop, connector=con) as session:
        tasks = [asyncio.ensure_future(self.unzip(session, q)) for _ in range(len(urls))]
        loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()

if name == “main”: x = pdb() urls = [‘ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/nf/pdb4nfn.ent.gz’, ‘ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/ny/pdb4nyj.ent.gz’, ‘ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/mn/pdb2mnz.ent.gz’, ‘ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/ra/pdb4ra4.ent.gz’, ‘ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/x5/pdb4x5w.ent.gz’, ‘ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/dm/pdb2dmq.ent.gz’, ‘ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/n7/pdb2n7r.ent.gz’, ‘ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/om/pdb2omv.ent.gz’, ‘ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/oy/pdb3oy8.ent.gz’, ‘ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/fe/pdb3fej.ent.gz’, ‘ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/hw/pdb2hw9.ent.gz’] x.download_queue(urls)

报错信息如下:

Traceback (most recent call last):
File "test.py", line 111, in <module>
x.download_queue(urls)
File "test.py", line 99, in download_queue
loop.run_until_complete(asyncio.gather(*tasks))
File "/home/yz/miniconda3/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
return future.result()
File "test.py", line 73, in unzip
data = await self.download_file(session, queue_url)
File "test.py", line 65, in download_file
return {"error": remotefile.status, "data": ""}
File "/home/yz/miniconda3/lib/python3.6/site-packages/async_timeout/init.py", line 46, in exit
raise asyncio.TimeoutError from None
concurrent.futures._base.TimeoutError

请大家帮忙看看。谢谢!


Python中使用async/await下载文件失败,如何解决?

23 回复

lftp 开多线程 10 万文件我也下载过,一点问题没有啊!除非服务器限制吧。


遇到 async/await 下载文件失败,通常有几个常见原因和对应的解决方法。下面是一个完整的、健壮的异步下载文件示例,涵盖了超时、重试和错误处理。

import aiohttp
import asyncio
import os

async def download_file(url, save_path, retries=3, timeout=10):
    """
    异步下载文件,支持重试和超时设置。
    
    :param url: 文件URL
    :param save_path: 本地保存路径
    :param retries: 重试次数
    :param timeout: 超时时间(秒)
    """
    async with aiohttp.ClientSession() as session:
        for attempt in range(retries):
            try:
                async with session.get(url, timeout=timeout) as response:
                    if response.status == 200:
                        # 确保保存目录存在
                        os.makedirs(os.path.dirname(save_path), exist_ok=True)
                        
                        # 写入文件
                        with open(save_path, 'wb') as f:
                            while True:
                                chunk = await response.content.read(1024)
                                if not chunk:
                                    break
                                f.write(chunk)
                        print(f"文件下载成功: {save_path}")
                        return True
                    else:
                        print(f"HTTP错误: {response.status}")
            except asyncio.TimeoutError:
                print(f"尝试 {attempt + 1}/{retries}: 下载超时")
            except aiohttp.ClientError as e:
                print(f"尝试 {attempt + 1}/{retries}: 网络错误 - {e}")
            except Exception as e:
                print(f"尝试 {attempt + 1}/{retries}: 未知错误 - {e}")
            
            # 如果不是最后一次尝试,等待后重试
            if attempt < retries - 1:
                await asyncio.sleep(2 ** attempt)  # 指数退避
    
    print(f"下载失败,已重试{retries}次: {url}")
    return False

async def main():
    url = "https://example.com/file.zip"
    save_path = "./downloads/file.zip"
    
    success = await download_file(url, save_path)
    if success:
        print("下载任务完成")
    else:
        print("下载任务失败")

# 运行主函数
if __name__ == "__main__":
    asyncio.run(main())

关键点说明:

  1. 使用 aiohttp:这是异步HTTP客户端库,比 requests 更适合异步环境。
  2. 超时处理:通过 timeout 参数防止请求挂起。
  3. 重试机制:当网络不稳定时自动重试,采用指数退避策略。
  4. 错误处理:捕获各种可能异常(超时、网络错误、HTTP状态码等)。
  5. 目录创建:自动创建不存在的保存目录。

常见问题排查:

  • 如果是SSL证书问题,可以在 session.get() 中添加 ssl=False 参数(仅测试环境使用)。
  • 确保URL可访问且文件存在。
  • 检查网络连接和代理设置。

一句话总结:aiohttp 替代同步库,加上超时、重试和错误处理就能解决大部分异步下载问题。

#1 哈哈我知道,肯定是我代码的问题不是人家服务器的问题

timeout 时间设置的太短了?

#3 我试过把那行删了,就会一直卡在那,感觉还是其他地方有问题。

#5 已经有 print 了呀,会直接把所有链接都打印出来然后卡住……

老老实实用 python 做一个下载链接的清单, 然后用 aria2c 下载吧… 省出来的时间都是你的

unzip 不要用 async,CPU 密集型。

Python 可以真多线程了吗?

#7 以后可能会了……但是这个就是练手的项目,想弄明白异步到底该怎么写。

#8 没太明白,unzip 里面只有取数据那一行用了 await,其他步骤都没有,这样也不可以吗?谢谢帮忙!

#9 还是有 GIL 的,估计多线程是有生之年系列了……不过现在多核越来越不值钱,多线程意义也没那么大了吧。

是客户端连接出错了把。从队列中取出 url 后会打印,发生了异常会继续拿,这样的话,如果请求有问题,应该是直接打印所有的 url 后结束?

你的问题在于:

with open(f"{id}.ent.gz", ‘wb’) as f:
…f.write(data[“data”].read())
with gzip.open(f"{id}.ent.gz", “rb”) as inFile, open(saved_pdb, “wb”) as outFile:
…shutil.copyfileobj(inFile, outFile)
os.remove(f"{id}.ent.gz")

这几行是没法被 asyncio 通过 Coroutine 并行化的,只能多线程。但是这就产生了两个问题,第一默认的 asyncio 不是多线程并行化的,第二即使设置 asyncio 多线程并行化,考虑到 GIL,Python 多线程也是不够用的。所以总体来说,asyncio 对你这段程序是不够的。还是得上多进程。

GIL 在文件 I/O 时不是会释放吗?
支持 async 的文件 I/O 的有:
1.aiofiles: https://github.com/Tinche/aiofiles
2.asyncio 中有 thread pool executor. run_in_executor()也可以处理文件 I/O.
如果还有别的方法,欢迎补充 :)

你说 unzip 是 cpu 密集型,那这跟 GIL 有啥关系? 还是要上多进程。

GIL 的存在会使得 Python 里的多线程对 CPU 密集型 程序优化作用有限,而多进程就可以避免这个缺点。

select, epoll 应该只是 I/O 复用,但其实还是属于阻塞模型吧?只不过是在 select 的时候阻塞,而不是在真正的 IO 调用上。

GIL 的存在导致文件读取的每个原子操作,线程切换的开销增大。而 shutil.copyfileobj,那是个 Python 循环,所以是不可能高效的。aiofiles 那东西在很多平台上面是多线程实现的,你可以 check 一下它的源代码。

基于这个原因,可以认为楼主的程序上多线程没救,所以 asyncio 就没有救(如果多线程有救,asyncio 还是可以用的)。而因为多线程无法使用,CPU 密集型的 unzip 就没法被 asyncio 搞定。这才导出了我的结论,unzip 是 cpu 密集型,不适合 asyncio。

至于 select、epoll 这类 I/O 复用,我觉得可以认为它们是非阻塞模型,因为它们避免了多线程模式下的 while { read } 县线程等待,和 callback 效果等同。我觉得并不一定 callback 才可以被认为是非阻塞,只要看是否达到同样的效果就可以了。

#19 那请问多进程和 asyncio 可以结合起来使用吗?之前用多进程写的也总是跑到一半就自己停了,也不报错。多谢帮助!

我记得 asyncio 有方法用多进程,不过现在的 api 都很基础很难用。所以你这需求用 python 其实挺麻烦的。

把 unzip,和 writefile 写成一个普通函数,用concurrent.futures.ProcessPoolExecutorloop.run_in_executor函数运行,这样就可以结合起来

await loop.run_in_executor(PPExecutor, func, args)

https://pymotw.com/3/asyncio/executors.html

为什么我跑你的代码会报
TypeError: Use async with instead
的错

装 aiohttp 2.3.0 版本, 解决 TypeError: Use async with instead

回到顶部