Python + pandas + chunksize 如何分块、分组再汇总统计?

有一个很大的文件 内容是一行一个 MD5 值 我需要统计每个 MD5 出现的次数
如果直接 pandas.read_csv 会 MemoryError
一行一行读+字典 也行 但不是我要的

怎么使用 分块读取 然后分组统计再汇总?
loop = True
chunkSize = 100000
chunks = []

while loop:
try:
chunk = data.get_chunk(chunkSize)
chunks.append(chunk)
except StopIteration:
loop = False
print(“Iteration is stopped.”)

df = pd.concat(chunks, ignore_index=True)
Python + pandas + chunksize 如何分块、分组再汇总统计?


9 回复

刚好手上有个类似的数据集,唯一与楼主不同的是每一行是一个[100, 150]的整数,我是这样统计的:

python<br>from collections import Counter<br><br>import pandas as pd<br><br>size = 2 ** 10<br>counter = Counter()<br>for chunk in pd.read_csv('file.csv', header=None, chunksize=size):<br> counter.update([i[0] for i in chunk.values])<br><br>print(counter)<br><br>

大概输出如下:
<br>Counter({100: 41,<br> 101: 40,<br> 102: 40,<br> ...<br> 150: 35})<br>


帖子回复:

这个需求很常见,处理大文件时chunksize配合分组聚合是标准做法。核心思路是:分块读取时,每块都按你的分组键(比如'group_col')进行预聚合(比如求和、计数),存储这些中间结果,最后将所有中间结果按相同分组键再次聚合,得到全局统计。

下面是一个完整的代码示例。假设我们有一个巨大的CSV文件sales.csv,包含region(地区)、amount(销售额)等列,我们需要统计每个地区的总销售额。

import pandas as pd

# 定义文件路径和分组列、统计列
file_path = 'sales.csv'
group_column = 'region'
value_column = 'amount'
chunk_size = 100000  # 每次读取10万行

# 初始化一个字典或DataFrame来存储中间聚合结果
# 这里用字典:键是分组值,值是[总和, 计数](方便扩展其他统计)
intermediate_results = {}

# 分块读取并处理
chunk_reader = pd.read_csv(file_path, chunksize=chunk_size)
for i, chunk in enumerate(chunk_reader):
    print(f"正在处理第 {i+1} 个数据块...")
    
    # 对当前块进行分组聚合(这里求总和)
    # 使用groupby().agg()可以一次性计算多个聚合指标
    chunk_grouped = chunk.groupby(group_column)[value_column].agg(['sum', 'count'])
    
    # 将中间结果累加到字典中
    for group_key, row in chunk_grouped.iterrows():
        if group_key not in intermediate_results:
            intermediate_results[group_key] = [row['sum'], row['count']]
        else:
            # 累加总和与计数
            intermediate_results[group_key][0] += row['sum']
            intermediate_results[group_key][1] += row['count']

# 将最终结果转换为DataFrame
final_result = pd.DataFrame.from_dict(intermediate_results, 
                                      orient='index', 
                                      columns=['total_amount', 'total_count'])
final_result.index.name = group_column
final_result = final_result.sort_index()

print("\n最终分组统计结果:")
print(final_result)

# 可选:保存结果
final_result.to_csv('region_summary.csv')
print("结果已保存至 region_summary.csv")

关键点解释:

  1. pd.read_csv(..., chunksize=chunk_size):返回一个可迭代对象,每次迭代得到一个包含chunk_size行的DataFrame。
  2. 核心逻辑:在for chunk in chunk_reader:循环内部,对每个数据块chunk执行你需要的groupby().agg()操作。这和你处理单个完整DataFrame的代码几乎一样。
  3. 中间聚合:我们不能直接把这些块的分组结果简单拼接,因为同一个分组键可能出现在多个块里。所以需要用一个容器(这里用了字典intermediate_results)来存储和累加每个分组键的中间统计量(如总和、计数、平均值等)。
  4. 最终汇总:所有块处理完后,intermediate_results里存储的就是每个组的全局总和与计数。将其转换为DataFrame就是最终结果。如果你还需要平均值,可以用total_amount / total_count计算。

如果分组键是多个列,比如按['region', 'product']分组,只需将group_column改为列表,并在累加时使用元组作为字典的键即可:

group_columns = ['region', 'product']
chunk_grouped = chunk.groupby(group_columns)[value_column].agg(['sum', 'count'])
for idx, row in chunk_grouped.iterrows():
    # idx现在是一个(region, product)元组
    if idx not in intermediate_results:
        ...

总结建议:分块预聚合后合并,内存友好。

Counter() 或者 Mapreduce 的思想做哦~

dask 一行搞定。

dd.groupby().count(),和 pandas 一样的 API,但是把 fill in memory 拓展到 fill in disk。

Counter 可以试试,有分布式观景首选 mapreduce

分布式环境

学习新姿势,一般数据量大都是实用 spark 完成计算,刚去看了下 dask,不错的包

sort | uniq -c

用 shell 比较简单

cat x.txt | sort | uniq -c

回到顶部