Python中多进程下载网络拥堵问题如何解决?
我需要下载约 10 万个小文件(每个 1M 左右。)
写了一个脚本, 开 8 个进程,每个进程用 10 个线程下载。
问题是: 无论如何修改进程,线程数,下载速度都没有太大变化,每秒 4 个左右。 而且本机的网络会很卡,打开网页很慢,,ping 192.168.1.1 (网关) 都要几百毫秒 。 这是为什么? 应该怎么优化? 同路由器链接的其他机器,网络不受影响。 我的机器配置: i7-3667u,256GB SSD, 8G 内存,ubuntu 14.04 64 位系统
64 bytes from 192.168.1.1: icmp_seq=372 ttl=64 time=210 ms
64 bytes from 192.168.1.1: icmp_seq=373 ttl=64 time=136 ms
64 bytes from 192.168.1.1: icmp_seq=374 ttl=64 time=166 ms
64 bytes from 192.168.1.1: icmp_seq=375 ttl=64 time=139 ms
64 bytes from 192.168.1.1: icmp_seq=376 ttl=64 time=369 ms
64 bytes from 192.168.1.1: icmp_seq=377 ttl=64 time=537 ms
64 bytes from 192.168.1.1: icmp_seq=378 ttl=64 time=536 ms
64 bytes from 192.168.1.1: icmp_seq=379 ttl=64 time=582 ms
64 bytes from 192.168.1.1: icmp_seq=380 ttl=64 time=461 ms
64 bytes from 192.168.1.1: icmp_seq=381 ttl=64 time=363 ms
64 bytes from 192.168.1.1: icmp_seq=382 ttl=64 time=309 ms
64 bytes from 192.168.1.1: icmp_seq=383 ttl=64 time=277 ms
64 bytes from 192.168.1.1: icmp_seq=384 ttl=64 time=312 ms
64 bytes from 192.168.1.1: icmp_seq=385 ttl=64 time=268 ms
64 bytes from 192.168.1.1: icmp_seq=386 ttl=64 time=392 ms
64 bytes from 192.168.1.1: icmp_seq=387 ttl=64 time=400 ms
64 bytes from 192.168.1.1: icmp_seq=388 ttl=64 time=266 ms
64 bytes from 192.168.1.1: icmp_seq=389 ttl=64 time=551 ms
64 bytes from 192.168.1.1: icmp_seq=390 ttl=64 time=382 ms
64 bytes from 192.168.1.1: icmp_seq=391 ttl=64 time=289 ms
64 bytes from 192.168.1.1: icmp_seq=392 ttl=64 time=309 ms
64 bytes from 192.168.1.1: icmp_seq=393 ttl=64 time=445 ms
64 bytes from 192.168.1.1: icmp_seq=394 ttl=64 time=411 ms
64 bytes from 192.168.1.1: icmp_seq=395 ttl=64 time=274 ms
64 bytes from 192.168.1.1: icmp_seq=396 ttl=64 time=262 ms
64 bytes from 192.168.1.1: icmp_seq=397 ttl=64 time=282 ms
64 bytes from 192.168.1.1: icmp_seq=398 ttl=64 time=588 ms
64 bytes from 192.168.1.1: icmp_seq=399 ttl=64 time=614 ms
64 bytes from 192.168.1.1: icmp_seq=400 ttl=64 time=222 ms
64 bytes from 192.168.1.1: icmp_seq=401 ttl=64 time=238 ms
64 bytes from 192.168.1.1: icmp_seq=402 ttl=64 time=335 ms
64 bytes from 192.168.1.1: icmp_seq=403 ttl=64 time=461 ms
64 bytes from 192.168.1.1: icmp_seq=404 ttl=64 time=491 ms
#coding=utf8
import threading
import multiprocessing
import time
import Queue
import os
import re
import requests
#SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列
SHARE_Q = multiprocessing.Queue() #构造一个不限制大小的的队列
_WORKER_THREAD_NUM = 2 #设置线程个数
_WORKER_PROCESS_NUM = 8
class MyThread(threading.Thread) :
def __init__(self, func, q) :
super(MyThread, self).__init__()
self.func = func
self.queue = q
def run(self) :
self.func(self.queue)
def worker(q):
session = requests.Session()
while not q.empty():
url, name = q.get() #获得任务
print "Processing : ", url, name
with open(name, ‘w’) as f:
f.write(session.get(url).content)
def main() :
global SHARE_Q
for root,dirs,files in os.walk(‘data’):
for filespath in files:
url = os.path.join(root,filespath)
url = os.path.abspath(url)
temp = []
if url.endswith(‘m3u8’):
path = os.path.dirname(url)
new_m3u8_path = os.path.join(path, ‘new_video.m3u8’)
content = open(url).read()
for x in content.split(’\n’):
if x.startswith(‘http://’):
x = x.split(’?’)[0]
_, name = os.path.split(x)
save_path = os.path.join(path, name)
temp.append(name)
if not os.path.exists(save_path) or os.path.getsize(save_path) == 0:
#print ‘download’, x, save_path
#os.system(‘wget %s -O %s’ % (x, save_path))
SHARE_Q.put([x, save_path])
#with open(save_path, ‘w’) as f:
#f.write(requests.get(x).content)
else:
temp.append(x)
with open(new_m3u8_path, ‘w’) as f:
f.write("\n".join(temp))
#exit()
processs = []
print ‘Queue length’, SHARE_Q.qsize()
def func(q, max_thread_num):
threads = []
for i in xrange(max_thread_num) :
thread = MyThread(worker, q)
thread.start()
threads.append(thread)
for thread in threads :
thread.join()
for i in xrange(_WORKER_PROCESS_NUM):
p = multiprocessing.Process(target=func, args=(SHARE_Q, _WORKER_THREAD_NUM))
p.start()
processs.append§
while True:
time.sleep(10)
print ‘Queue length’, SHARE_Q.qsize()
for p in processs:
p.join()
Python中多进程下载网络拥堵问题如何解决?
楼主如果只是为了下载程序的话, 研究一下 aria2c, 他支持文件形式的批量导入下载链接的…你只需要给他一个 1w 文件的链接即可…多线程, 多文件同时下载占满带宽不是问题
用多进程下载时网络拥堵,主要是因为同时发起的连接数太多,把带宽打满了或者被服务器限制了。核心思路是控制并发连接数,别让所有进程一块儿猛下。
最直接的办法是用进程池(multiprocessing.Pool)或者信号量(threading.Semaphore)来限制同时活跃的下载进程数量。比如,你可以设置一个全局的信号量,每个下载任务开始前先获取信号量,这样就能控制住最大并发数。
下面是个用multiprocessing.Pool配合imap的例子,它能很方便地控制工作进程的数量,避免创建过多进程:
import requests
from multiprocessing import Pool
import os
def download_file(url_path):
url, save_path = url_path
try:
# 流式下载大文件,避免内存爆掉
with requests.get(url, stream=True, timeout=30) as r:
r.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"下载完成: {save_path}")
return save_path, True
except Exception as e:
print(f"下载失败 {url}: {e}")
return save_path, False
def main():
# 你的下载任务列表,格式:(url, 本地保存路径)
download_tasks = [
('https://example.com/file1.zip', './data/file1.zip'),
('https://example.com/file2.zip', './data/file2.zip'),
# ... 更多任务
]
# 关键在这里:设置进程池的大小,比如4个进程,这就是最大并发数
# 根据你的网络和服务器情况调整这个数,一般4-8个就差不多了
max_workers = 4
os.makedirs('./data', exist_ok=True)
with Pool(processes=max_workers) as pool:
# 使用imap有序迭代结果,也可以用imap_unordered如果顺序不重要
for save_path, success in pool.imap(download_file, download_tasks):
if not success:
# 这里可以加失败重试的逻辑
pass
if __name__ == '__main__':
main()
这个方案里,max_workers就是控制并发的阀门。如果还堵,就再把这个数调小点。另外,单个下载任务里用stream=True和分块写入,是为了防止下大文件时内存炸了。
简单说就是:用池子控制进程数,别开太多。
磁盘 io,CPU 负载,网络流量都不贴下啊
带宽没有跑满。 磁盘 io, cpu, 网络流量 都很低,
在服务器上用 tar 把十万个小文件压缩成一个大文件
下载下来
再解压
BBR
我怎么觉得每秒下载速度 4M 很正常…
我是要下载 勤思考研的视频。
每个视频都是一个 m3u8 的文件,每个视频分割成了 200 多个小文件。 需要先把这些小文件下载下来, 再用 ffmpeg 合成一个新文件。
这些小文件一共有 10 多万。下载了约 7 个小时才下载完成。
是不是你用的无线网络下载?
如果你的网络带宽没问题,可能是网卡跑满了,你换成有线再试试
如果用无线网络的话,普通路由 4M 已经满带宽了
电脑路由器都重启一下
可以试下用协程,用 asyncio 和 aiohttp 试一下。我用这两个库写过下载 m3u8 的视频的脚本,下载速度很快。
路由器是水星 mw300R ( 300M ), 笔记本通过 wifi 无线连接 路由器。
能换个好点的路由器吗?
能换个支持 QoS 的路由器吗。
能用有线连接吗?


