使用 Python 处理大文件有什么推荐的方式么?

大概五六个 G 的文件,目前分割成了数个小文件,使用 multiprocessing 这种处理了一下,但是效率还是太低了

请问哪位有什么推荐的方法?

另外,我在尝试使用 pp 这个库,不过总是提示变量未定义,实际上已经定义了,使用 multiprocessing 这种都不会提示这个错误,使用的代码如下:

cpu_num = 8
job_server = pp.Server(cpu_num)
a =  []
for f in xxxxx:
    a.append(job_server.submit(func_name, (f, )))
for x in a:
    tmp = x()

有这方面经验的朋友帮一下忙,感谢


使用 Python 处理大文件有什么推荐的方式么?

47 回复

话说为什么不能 append 了…

再说具体一些吧,就是把文件内容读出来做一些处理然后存到另外的文件去。

不知哪位处理过类似的需求


处理大文件的核心思路是避免一次性加载到内存。推荐使用迭代读取和流式处理。

1. 文本文件:逐行读取

with open('large_file.txt', 'r', encoding='utf-8') as f:
    for line in f:  # 每次只读取一行到内存
        process(line)  # 你的处理函数

2. CSV文件:使用csv模块迭代

import csv
with open('large.csv', 'r') as f:
    reader = csv.reader(f)
    for row in reader:
        process_row(row)

3. 二进制文件:分块读取

chunk_size = 1024 * 1024  # 每次读取1MB
with open('large.bin', 'rb') as f:
    while chunk := f.read(chunk_size):
        process_chunk(chunk)

4. Pandas处理大CSV:指定chunksize

import pandas as pd
chunk_iter = pd.read_csv('huge.csv', chunksize=10000)
for chunk in chunk_iter:
    process_dataframe(chunk)

5. 使用生成器管道处理

def read_lines(filename):
    with open(filename) as f:
        for line in f:
            yield line.strip()

def filter_lines(lines, keyword):
    for line in lines:
        if keyword in line:
            yield line

# 使用
lines = read_lines('large.log')
filtered = filter_lines(lines, 'ERROR')
for line in filtered:
    print(line)

关键工具总结:

  • open()的迭代读取
  • csv.reader逐行解析
  • pandas.read_csv(chunksize=)
  • 生成器(yield)构建处理管道
  • 对于超大数据考虑DaskPySpark

一句话建议:用迭代代替一次性加载,内存不够就分块。

之前的办法大概需要多久…

pyspark 跑个本地 spark

你觉得瓶颈在哪里,搞清这个最重要

现在的效率是什么样的,你想达到的预期是什么样呢?

这个量级的文件跑几个进程去处理不至于会太慢,再不济三楼说的上 spark 也能比较快的解决,只是不知道你要求的效率什么样的。

别用 Python,,,我去年做个实验,处理 1g 的数据,由于写的 Python 比较渣,一天只处理了几 m,花了半个多小时,写了个 c 版本,十几分钟就跑完了。。

Python 写了个导出一个游戏数据的文本,要 3 ~ 4 秒的样子,用 c 写了个,1 秒不到。。。

我会尝试用 go


是你自己的锅的可能性远大于 Python

spark 或 dask 试试

大概不到 1kw 行的数据,现在的速度是 1h 处理 5-6w … 要求的速度是不到一天处理完

才 1G,用 JS 都不需要 1 天啊…

刚刚试了试。。妈蛋服务器上 spark 环境有问题

300 多万 html,压缩后 80G,python 多进程在 4c8t 处理器上大约跑了 2 个小时做 html 解析,因为都是小文件,机械磁盘 IO 瓶颈,后期放到 ssd 上好了很多。供参考

以前弄过一些组学的文件,如果是纯文本的话不要用 for…in…语句,用 readlines 读入几千行后再用 readline 处理,如果是 utf8 之类非等长的字节编码的话先按 byte 编码读入文本必要时再解析成别的,最后把文件塞到内存盘再加个多进程。如果要求不高用 bash+awk 比 python 快很多。

感觉是读取后处理文件比较慢…所以尝试分成了小文件,然后发现速度也没快,囧

读取多行压缩放 redis 用多台机器消费 之前处理 mysql 审计日志 30g 大概一个小时搞定 3 台 i5 机器

楼主先确定瓶颈在哪里,用 C++重写也就只有 2 ~ 3 倍的提升而已。对于大文件,Windows 下可以用内存映射的方式,多线程分别映射不同区域同时处理。8 核跑满应该也能提升个 5 ~ 6 倍吧

目前用了一些 py 的库,再改 C 的话感觉时间来不及…捉急

唔,还不会写 go


我去搜一下


这个速度很快了啊,多进程用的什么库呢


纯文本。读取后处理的过程用到了 py 的库……所以目前想看看使用 py 有没有什么解决办法


公司服务器…我装个什么东西都得审批,周末是没什么希望了


对,感觉其他语言重写可能也达不到我要求对速度。 不是 windows,跑在服务器上,后期还要搞成定时任务…所以要求肯定要一天内跑完

multiprocessing 啊

Linux 应该也有类似的机制,可以查下文档。还是建议 profile 先,先从算法本身优化,热点再用 C++写一下。如果你要干的事情确实就有这么多,那你唯一可以干的就是把 CPU 吃满,换 SSD,加机器

感觉 python 原生解析文本的几个函数效率很低,没压缩只有几个 G 的文件问题可能在这里,应该和多进程无关

很可能,哈哈。

>>现在的速度是 1h 处理 5-6w
6w/60/60 = 16.66666666666667

1 秒不到 17 条?
不知道读出来做的什么处理,所以不确定是不是出问题的地方。
你先试试只读取+处理不写看看性能,怀疑每次写都强制 sync 了。

我有处理 40G 文本文件的经验。

我的文本文件每一行都是一个 JSON,用 Python 读取出来,再把每一行的 JSON 转成字典并插入到 MongoDB 中。使用 Python 的 readline()一行一行读,凑够了 10000 个字典以后一次性插入 MongoDB,亲测单线程单进程 4 个小时不到就跑完了。


单线程处理一下,看一下读取 /处理 /写入分别的耗时是多少
找到瓶颈才好优化

建议你重点在优化算法逻辑处理部分,py 读 /写文件不是关键点。
尝试下 pypy 看能不能改善。

经你这么一说,仿佛明白了一些,做的解密处理…应该是这个原因

去掉解密,只读写没什么问题,很快…


解析倒是没啥问题,只是 split 一下;问题应该在处理对过程,我做的解密处理…


现在用了这个,效果不太好。 试了一下 pp 想并行一下,结果出现了题目里对错误


这么一说更觉得是我处理过程导致的速度慢了,我做的解密处理……

嗯,主要是数据处理这里,我处理的是加解密,瓶颈应该是在这里… pypy 无力,在服务器上,装个软件都是运帷去搞

在想办法提高看看怎么并行处理一下


嗯,瓶颈应该是解密这里了…目前还没有找到好的办法

楼上说的 dask 还有我尝试的 pp 居然都碰到了错误…… pyspark 环境也有问题,想死

假设一小时单线程能处理 6w 行,那四核(超线程在这里的用途应该不大)的服务器一天也只能跑 576w 行呢。所以你有可能需要换一个更快的库或者换一个语言(

加解密你找个有 C 的模块,快起码不是一个数量级。

或者你说说你现在有的啥模块?

服务器上跑的话,如果这样算,也只能尝试多几用几个核了……


用的 pycrypto …有什么推荐的模块么


刚刚试了一下 dask

<br> from dask import delayed<br> #import dask.bag as db<br> L = []<br> for fn in en_files:<br> # b = db.read_text(fn)<br> a = delayed(fun_name)(fn) # Delay execution of function<br> L.append(a)<br> result = delayed(L)<br> result.compute()<br>
使用这种方式处理,发现和下面这种方式处理效率相似,看了看 cpu 占用率也很相似,大概都是单核 100%左右

<br> #pool = ThreadPool(20)<br> #pool.map(decrypt_file, encrypt_files)<br> #pool.close()<br> #pool.join()<br>

话说这是我使用方式的问题,还是什么?

#34 所以差不多要 8 核 16 线程才能满足呢

#35

- -。要是可以的话也想试试,先跑一下看看
用 pp 试了一下总是提示变量未定义

可能还是在 io,for fn in en_files 这个依赖系统硬盘缓存的,如果行数多但每行字少的时候很慢,主进程一次一行的分配任务会把主进程的 cpu 耗尽,不如一次 readlines 几千行塞到各个进程里在内存里由各个进程逐行处理。而且不知道为什么 python 解析 utf8 奇慢,全英文的转成其他编码的再处理。我以前遇到是这种问题。

难道都没看到楼主要处理的主要是加解密吗?瓶颈绝对值这啊!加解密模块用 C 来写吧!

找个能支持 CPU 硬件加解密加速器的库 /加解密库试试

最简单的答案是换 java/c/c++

有性能要求的 Python 永远不是第一选择

换 pypy 可解(应该

问题应该是在加解密这里,读取存储试了一下如果没有加解密还是挺快的


感谢,回头试一下


对,瓶颈是这里,很费资源,该考虑换一种方式了




还能这样,我赶紧搜一下


呜呜,服务器现在还是 py2.6 呢。。装 2.7 都得给运帷打申请


那尝试一下 java 好了。。

这种玩意 感觉上 Cython 或者 Pypy 效果会很不错

效果应该会好一些,但是可能还是不太符合性能上的要求。

我用 pp 库把 CPU 跑满了,最后可以跑出来,但是消耗的时间依然很久,感觉用集群处理比较合适。

准备看看怎么搞一下

回到顶部