Python中如何高效操作大文件并更新MongoDB数据库
题目的表达可能有些问题。 情况是这样的: 现有已经存在大量数据的 MongoDB 集合,现在要添加一个新的字段,然后这样字段的值来自于一个 json 文件。不过这样文件特别大( 4.5G )在使用 Python 中 json 的 loads ()来操作会卡死,因为 12G 的内存更本弄不了。我还想过 split 来处理 json 文件,但是生成的文件太多,造成在遍历文件的时候卡主了。 至于为什么不用 MongoDB 自带的处理 json ,是因为 json 有些字段不需要。
不过我想想,目前要是能导入进去就可以了。
问题就是怎么去更新呢?
谢谢
Python中如何高效操作大文件并更新MongoDB数据库
这个不太懂,不过如果没有其它好办法的话能不能用 MongoDB 自带的先导入到一个临时库里再去掉不需要的部分?
对于大文件处理和批量更新MongoDB,核心是分块读取和批量写入。别用read()或readlines()一次性加载整个文件,内存会爆。用pymongo的bulk_write()来批量操作,效率比单条insert_one()高几个数量级。
下面是一个处理大CSV文件并更新MongoDB的典型代码:
import csv
from pymongo import MongoClient, UpdateOne
from itertools import islice
def process_large_file_and_update_mongodb(file_path, batch_size=1000):
"""
分块读取大CSV文件,并批量更新MongoDB。
:param file_path: CSV文件路径
:param batch_size: 每批处理的记录数,默认1000
"""
client = MongoClient('mongodb://localhost:27017/')
db = client['your_database']
collection = db['your_collection']
operations = [] # 用于累积批量操作
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f) # 假设CSV第一行是列名
while True:
# 使用islice分块读取,避免内存溢出
batch = list(islice(reader, batch_size))
if not batch:
break # 文件读取完毕
for row in batch:
# 构建查询条件,例如根据唯一ID
query = {'_id': row['id']}
# 构建更新操作,$set表示设置字段
update = {'$set': {'name': row['name'], 'value': row['value']}}
# 使用UpdateOne累积操作
operations.append(UpdateOne(query, update, upsert=True))
# 批量执行
if operations:
collection.bulk_write(operations)
operations.clear() # 清空操作列表,准备下一批
print(f"已批量处理 {batch_size} 条记录")
client.close()
print("处理完成")
# 使用示例
process_large_file_and_update_mongodb('large_file.csv', batch_size=5000)
关键点:
- 分块读取:用
itertools.islice或生成器逐块读取文件,内存占用恒定。 - 批量操作:用
pymongo.bulk_write()配合UpdateOne,设置upsert=True可实现存在则更新、不存在则插入。 - 调整批次大小:根据你的数据和硬件调整
batch_size,通常在1000到10000之间找平衡点。
如果文件不是CSV,是JSON或日志,思路一样:换对应的解析方式(如json.loads()),但分块和批量更新的核心不变。
一句话建议:用生成器分块读,用bulk_write批量更新。
使用游标读取数据,手动设置一次更新的数据条数,时间换空间,另外这种可以考虑一下协程或者多进程操作的吧?
你这问题应该和 mongodb 无关吧,主要问题是如何用 python 操作大的 json 文件读写数据
之前遇到过类似的问题,从一个超大的文本文件倒入到数据库,比你这个还大, 10G 每个文件,本来用的 mongodb ,测了一次以后我转到 postgresql 去了……
MongoDB 有 aggregate pipeline / bulk 操作, up 可以查一查
不生成文件对应的 JSON 对象,直接解析 .json 文件,使用正则表达式或者其他什么手段来提取数据。
你是说用 split(1) 处理?如果可以用分行处理,那也可以直接用 python 的 for line in file_object 来逐行处理啊
既然 Python 都参与了,没有对数据做 pagination 么?

