Python中多进程下载网络拥堵问题如何解决?

我需要下载约 10 万个小文件(每个 1M 左右。)

写了一个脚本, 开 8 个进程,每个进程用 10 个线程下载。

问题是: 无论如何修改进程,线程数,下载速度都没有太大变化,每秒 4 个左右。 而且本机的网络会很卡,打开网页很慢,,ping 192.168.1.1 (网关) 都要几百毫秒 。 这是为什么? 应该怎么优化? 同路由器链接的其他机器,网络不受影响。 我的机器配置: i7-3667u,256GB SSD, 8G 内存,ubuntu 14.04 64 位系统

64 bytes from 192.168.1.1: icmp_seq=372 ttl=64 time=210 ms
64 bytes from 192.168.1.1: icmp_seq=373 ttl=64 time=136 ms
64 bytes from 192.168.1.1: icmp_seq=374 ttl=64 time=166 ms
64 bytes from 192.168.1.1: icmp_seq=375 ttl=64 time=139 ms
64 bytes from 192.168.1.1: icmp_seq=376 ttl=64 time=369 ms
64 bytes from 192.168.1.1: icmp_seq=377 ttl=64 time=537 ms
64 bytes from 192.168.1.1: icmp_seq=378 ttl=64 time=536 ms
64 bytes from 192.168.1.1: icmp_seq=379 ttl=64 time=582 ms
64 bytes from 192.168.1.1: icmp_seq=380 ttl=64 time=461 ms
64 bytes from 192.168.1.1: icmp_seq=381 ttl=64 time=363 ms
64 bytes from 192.168.1.1: icmp_seq=382 ttl=64 time=309 ms
64 bytes from 192.168.1.1: icmp_seq=383 ttl=64 time=277 ms
64 bytes from 192.168.1.1: icmp_seq=384 ttl=64 time=312 ms
64 bytes from 192.168.1.1: icmp_seq=385 ttl=64 time=268 ms
64 bytes from 192.168.1.1: icmp_seq=386 ttl=64 time=392 ms
64 bytes from 192.168.1.1: icmp_seq=387 ttl=64 time=400 ms
64 bytes from 192.168.1.1: icmp_seq=388 ttl=64 time=266 ms
64 bytes from 192.168.1.1: icmp_seq=389 ttl=64 time=551 ms
64 bytes from 192.168.1.1: icmp_seq=390 ttl=64 time=382 ms
64 bytes from 192.168.1.1: icmp_seq=391 ttl=64 time=289 ms
64 bytes from 192.168.1.1: icmp_seq=392 ttl=64 time=309 ms
64 bytes from 192.168.1.1: icmp_seq=393 ttl=64 time=445 ms
64 bytes from 192.168.1.1: icmp_seq=394 ttl=64 time=411 ms
64 bytes from 192.168.1.1: icmp_seq=395 ttl=64 time=274 ms
64 bytes from 192.168.1.1: icmp_seq=396 ttl=64 time=262 ms
64 bytes from 192.168.1.1: icmp_seq=397 ttl=64 time=282 ms
64 bytes from 192.168.1.1: icmp_seq=398 ttl=64 time=588 ms
64 bytes from 192.168.1.1: icmp_seq=399 ttl=64 time=614 ms
64 bytes from 192.168.1.1: icmp_seq=400 ttl=64 time=222 ms
64 bytes from 192.168.1.1: icmp_seq=401 ttl=64 time=238 ms
64 bytes from 192.168.1.1: icmp_seq=402 ttl=64 time=335 ms
64 bytes from 192.168.1.1: icmp_seq=403 ttl=64 time=461 ms
64 bytes from 192.168.1.1: icmp_seq=404 ttl=64 time=491 ms
#coding=utf8

import threading

import multiprocessing import time import Queue import os import re import requests

#SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列 SHARE_Q = multiprocessing.Queue() #构造一个不限制大小的的队列 _WORKER_THREAD_NUM = 2 #设置线程个数 _WORKER_PROCESS_NUM = 8

class MyThread(threading.Thread) :

def __init__(self, func, q) :
    super(MyThread, self).__init__()
    self.func = func
    self.queue = q

def run(self) :
    self.func(self.queue)

def worker(q): session = requests.Session() while not q.empty(): url, name = q.get() #获得任务 print "Processing : ", url, name with open(name, ‘w’) as f: f.write(session.get(url).content)

def main() : global SHARE_Q

for root,dirs,files in os.walk(‘data’): for filespath in files: url = os.path.join(root,filespath) url = os.path.abspath(url) temp = [] if url.endswith(‘m3u8’): path = os.path.dirname(url) new_m3u8_path = os.path.join(path, ‘new_video.m3u8’) content = open(url).read() for x in content.split(’\n’): if x.startswith(‘http://’): x = x.split(’?’)[0] _, name = os.path.split(x) save_path = os.path.join(path, name) temp.append(name) if not os.path.exists(save_path) or os.path.getsize(save_path) == 0: #print ‘download’, x, save_path #os.system(‘wget %s -O %s’ % (x, save_path)) SHARE_Q.put([x, save_path]) #with open(save_path, ‘w’) as f: #f.write(requests.get(x).content) else: temp.append(x) with open(new_m3u8_path, ‘w’) as f: f.write("\n".join(temp)) #exit()

processs = [] print ‘Queue length’, SHARE_Q.qsize()

def func(q, max_thread_num): threads = [] for i in xrange(max_thread_num) : thread = MyThread(worker, q) thread.start() threads.append(thread) for thread in threads : thread.join()

for i in xrange(_WORKER_PROCESS_NUM): p = multiprocessing.Process(target=func, args=(SHARE_Q, _WORKER_THREAD_NUM)) p.start() processs.append§

while True: time.sleep(10) print ‘Queue length’, SHARE_Q.qsize()

for p in processs: p.join()


Python中多进程下载网络拥堵问题如何解决?

14 回复

楼主如果只是为了下载程序的话, 研究一下 aria2c, 他支持文件形式的批量导入下载链接的…你只需要给他一个 1w 文件的链接即可…多线程, 多文件同时下载占满带宽不是问题


用多进程下载时网络拥堵,主要是因为同时发起的连接数太多,把带宽打满了或者被服务器限制了。核心思路是控制并发连接数,别让所有进程一块儿猛下。

最直接的办法是用进程池(multiprocessing.Pool)或者信号量(threading.Semaphore)来限制同时活跃的下载进程数量。比如,你可以设置一个全局的信号量,每个下载任务开始前先获取信号量,这样就能控制住最大并发数。

下面是个用multiprocessing.Pool配合imap的例子,它能很方便地控制工作进程的数量,避免创建过多进程:

import requests
from multiprocessing import Pool
import os

def download_file(url_path):
    url, save_path = url_path
    try:
        # 流式下载大文件,避免内存爆掉
        with requests.get(url, stream=True, timeout=30) as r:
            r.raise_for_status()
            with open(save_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
        print(f"下载完成: {save_path}")
        return save_path, True
    except Exception as e:
        print(f"下载失败 {url}: {e}")
        return save_path, False

def main():
    # 你的下载任务列表,格式:(url, 本地保存路径)
    download_tasks = [
        ('https://example.com/file1.zip', './data/file1.zip'),
        ('https://example.com/file2.zip', './data/file2.zip'),
        # ... 更多任务
    ]

    # 关键在这里:设置进程池的大小,比如4个进程,这就是最大并发数
    # 根据你的网络和服务器情况调整这个数,一般4-8个就差不多了
    max_workers = 4
    os.makedirs('./data', exist_ok=True)

    with Pool(processes=max_workers) as pool:
        # 使用imap有序迭代结果,也可以用imap_unordered如果顺序不重要
        for save_path, success in pool.imap(download_file, download_tasks):
            if not success:
                # 这里可以加失败重试的逻辑
                pass

if __name__ == '__main__':
    main()

这个方案里,max_workers就是控制并发的阀门。如果还堵,就再把这个数调小点。另外,单个下载任务里用stream=True和分块写入,是为了防止下大文件时内存炸了。

简单说就是:用池子控制进程数,别开太多。

带宽没跑满吗?我也差不多的情况,开 5 个线程跑就满了。

磁盘 io,CPU 负载,网络流量都不贴下啊

带宽没有跑满。 磁盘 io, cpu, 网络流量 都很低,

在服务器上用 tar 把十万个小文件压缩成一个大文件
下载下来
再解压

我怎么觉得每秒下载速度 4M 很正常…

我是要下载 勤思考研的视频。
每个视频都是一个 m3u8 的文件,每个视频分割成了 200 多个小文件。 需要先把这些小文件下载下来, 再用 ffmpeg 合成一个新文件。

这些小文件一共有 10 多万。下载了约 7 个小时才下载完成。


是不是你用的无线网络下载?
如果你的网络带宽没问题,可能是网卡跑满了,你换成有线再试试

如果用无线网络的话,普通路由 4M 已经满带宽了

电脑路由器都重启一下

可以试下用协程,用 asyncio 和 aiohttp 试一下。我用这两个库写过下载 m3u8 的视频的脚本,下载速度很快。

路由器是水星 mw300R ( 300M ), 笔记本通过 wifi 无线连接 路由器。
能换个好点的路由器吗?
能换个支持 QoS 的路由器吗。
能用有线连接吗?

回到顶部