Python中如何使用pymongo将查询结果转换成CSV?

我有一个需求,就是将 mongo 的结果通过 python 转换成 csv,但是遇到这类数据结构就不知道怎么处理:

{
"_id": xxxxzx
“ uid ”: 1378914,
 "User":[
{
'0':{"is_pay": true,
  "product": "清洁器"
},
'1':{"is_pay": true,
  "product": "大彩电"
}
}
],
“ is_VIP ”: "",
"last_time": ISODate("xxxxxx")
"history": ["xxxx", "xxxx"]
}

有什么办法能将内嵌的字段的 jkey 转化成CSV header, 将 list 里面的 value 转换成“ xxx, xxx ”呢?


Python中如何使用pymongo将查询结果转换成CSV?

17 回复

额。。这不是很简单么。。你用 python 写个 for 循环


import csv
from pymongo import MongoClient

# 连接MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['your_database']
collection = db['your_collection']

# 执行查询
cursor = collection.find({})  # 这里可以添加查询条件,比如{"status": "active"}

# 指定CSV文件名
csv_file = 'output.csv'

# 获取所有文档,提取字段名(使用第一个文档的键)
all_docs = list(cursor)
if all_docs:
    # 获取所有可能的字段名(合并所有文档的键)
    fieldnames = set()
    for doc in all_docs:
        fieldnames.update(doc.keys())
    
    # 写入CSV
    with open(csv_file, 'w', newline='', encoding='utf-8') as file:
        writer = csv.DictWriter(file, fieldnames=sorted(fieldnames))
        writer.writeheader()
        
        for doc in all_docs:
            # 处理嵌套文档(这里简单转换为字符串,复杂情况需要递归处理)
            row = {}
            for key in fieldnames:
                value = doc.get(key)
                if isinstance(value, (dict, list)):
                    row[key] = str(value)  # 复杂类型转为字符串
                else:
                    row[key] = value
            writer.writerow(row)
    
    print(f"数据已导出到 {csv_file}")
else:
    print("没有找到数据")

这个代码做了几件事:先连接MongoDB并执行查询,然后自动提取所有文档中的字段名作为CSV表头。处理数据时会把字典或列表这类嵌套结构转成字符串,避免CSV格式问题。如果你需要处理更复杂的嵌套数据,可能得写个递归函数来展平结构。

简单说就是查数据、定表头、处理嵌套、写文件。

用内置的 csv 模块 for 循环输出



问题是内嵌的字段,问题是不确定 key 的名称,这个 schema 长度是不固定的

那就遍历两遍…

好吧 看来我要告诉你一个神器了 常见格式数据一键自动转换 数据处理懒人必备库
python 第三方库 Tablib
https://github.com/kennethreitz/tablib

你的前提前提是知道有多少嵌套对象,但是我是不知道的,你怎么确定写两遍一定可以呢?

感谢哦,我看看能不能解决

第一遍取完所有的 key,第二遍填内容,怎么可能不可以?

当然不行,如果是内嵌的话, 例如:

{
“ User ”:[
“0”:{“is_pay”: true,
“product”: “清洁器”
},
“1”:{“is_pay”: true,
“product”: “大彩电”
}]
}

这里面有三层,你一次根本弄不完

为什么取不完…

show your code

同样需求的表示持续关注

keys=set()
for key in a.keys():
…keys.add(key)
for goods in a[‘User’]:
…for good in goods.values():
…for key in good.keys():
…keys.add(key)

有了 json 的所有 keys 之后,你再根据这些 key 去找对应的数据填充,User list 有几个成员最后就会产生几行

感謝你的回答,但是我这个不是一个好主意,因为如果内嵌更多数据后,依然无法全部解压出来,需要写更多的 for 循环

我用的方法,先将 JSON 转成 python dict,然后扁平化,最后用 tablib 去处理。

扁平化代码:

def flatten_json(maps, delimiter):
“”"
flatten the Map object

:param maps: the map object
:param delimiter: the delimiter symbols
:return: A new dict object

“”"

val = {}

for i in maps.keys():

if isinstance(maps[i], dict):
get = flatten_json(maps[i], delimiter)

for j in get.keys():
val[i + delimiter + j] = get[j]

elif isinstance(maps[i], (tuple, list)):

if maps[i]:
for item_seq in xrange(len(maps[i])):
item = maps[i][item_seq]
if not item:
continue

elif isinstance(item, (str, unicode)):
val[i] = ‘;’.join(maps[i])
break

elif isinstance(item, dict):

for j in item.keys():
val[i + delimiter + j+’:’+str(item_seq)] = item[j]

else:
val[i] = None

else:
val[i] = maps[i]

return val

需要所有的 key 么?如果有固定列表就好了,不过看了下评论好像没有。

回到顶部