Python中如何高效操作大文件并更新MongoDB数据库

题目的表达可能有些问题。 情况是这样的: 现有已经存在大量数据的 MongoDB 集合,现在要添加一个新的字段,然后这样字段的值来自于一个 json 文件。不过这样文件特别大( 4.5G )在使用 Python 中 json 的 loads ()来操作会卡死,因为 12G 的内存更本弄不了。我还想过 split 来处理 json 文件,但是生成的文件太多,造成在遍历文件的时候卡主了。 至于为什么不用 MongoDB 自带的处理 json ,是因为 json 有些字段不需要。

不过我想想,目前要是能导入进去就可以了。

问题就是怎么去更新呢?

谢谢


Python中如何高效操作大文件并更新MongoDB数据库

9 回复

这个不太懂,不过如果没有其它好办法的话能不能用 MongoDB 自带的先导入到一个临时库里再去掉不需要的部分?


对于大文件处理和批量更新MongoDB,核心是分块读取批量写入。别用read()readlines()一次性加载整个文件,内存会爆。用pymongobulk_write()来批量操作,效率比单条insert_one()高几个数量级。

下面是一个处理大CSV文件并更新MongoDB的典型代码:

import csv
from pymongo import MongoClient, UpdateOne
from itertools import islice

def process_large_file_and_update_mongodb(file_path, batch_size=1000):
    """
    分块读取大CSV文件,并批量更新MongoDB。
    :param file_path: CSV文件路径
    :param batch_size: 每批处理的记录数,默认1000
    """
    client = MongoClient('mongodb://localhost:27017/')
    db = client['your_database']
    collection = db['your_collection']

    operations = []  # 用于累积批量操作

    with open(file_path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)  # 假设CSV第一行是列名

        while True:
            # 使用islice分块读取,避免内存溢出
            batch = list(islice(reader, batch_size))
            if not batch:
                break  # 文件读取完毕

            for row in batch:
                # 构建查询条件,例如根据唯一ID
                query = {'_id': row['id']}
                # 构建更新操作,$set表示设置字段
                update = {'$set': {'name': row['name'], 'value': row['value']}}
                # 使用UpdateOne累积操作
                operations.append(UpdateOne(query, update, upsert=True))

            # 批量执行
            if operations:
                collection.bulk_write(operations)
                operations.clear()  # 清空操作列表,准备下一批
                print(f"已批量处理 {batch_size} 条记录")

    client.close()
    print("处理完成")

# 使用示例
process_large_file_and_update_mongodb('large_file.csv', batch_size=5000)

关键点:

  1. 分块读取:用itertools.islice或生成器逐块读取文件,内存占用恒定。
  2. 批量操作:用pymongo.bulk_write()配合UpdateOne,设置upsert=True可实现存在则更新、不存在则插入。
  3. 调整批次大小:根据你的数据和硬件调整batch_size,通常在1000到10000之间找平衡点。

如果文件不是CSV,是JSON或日志,思路一样:换对应的解析方式(如json.loads()),但分块和批量更新的核心不变。

一句话建议:用生成器分块读,用bulk_write批量更新。

使用游标读取数据,手动设置一次更新的数据条数,时间换空间,另外这种可以考虑一下协程或者多进程操作的吧?

你这问题应该和 mongodb 无关吧,主要问题是如何用 python 操作大的 json 文件读写数据

之前遇到过类似的问题,从一个超大的文本文件倒入到数据库,比你这个还大, 10G 每个文件,本来用的 mongodb ,测了一次以后我转到 postgresql 去了……

MongoDB 有 aggregate pipeline / bulk 操作, up 可以查一查

不生成文件对应的 JSON 对象,直接解析 .json 文件,使用正则表达式或者其他什么手段来提取数据。

你是说用 split(1) 处理?如果可以用分行处理,那也可以直接用 python 的 for line in file_object 来逐行处理啊

既然 Python 都参与了,没有对数据做 pagination 么?

回到顶部