Python中如何高效地将10亿级JSON数据插入PostgreSQL并充分利用单台服务器性能?
- 问题说明:
目前有一个文本格式的 json 数据,里面数据量约 15 亿,每一行数据的格式都是固定的,插入前先要将 json 反序列化。运行环境为:windows server 2016,postgresql 10,Xeon [email protected],16G ddr3 1333,硬盘 io 上限约 120MB/s,服务器板载 RAID 无法使用,用 windows 的带区卷将两块硬盘组成一个卷,极限 io 也就 170MB/s,性价比太低就没做,直接使用了单块硬盘。
2. 思路和伪代码:
基本思路,遍历 json 文本,每 100 万行文本做一次插入。插入时,将 100 万行文本切割成小的分组用多线程的方式并行插入,每个线程每次都建立一个新的数据库连接并处理一个分组。待 100 万行文本处理完毕后再继续遍历 json。
> 首先进行一些数据库的基本优化:
* 创建数据库和表,将表设置为 unlogged
* 开启 postgresql 的异步提交
python <br># python 伪代码 <br> <br>def do_insert(rows): <br> # 每次插入都建立新的连接 <br> conn=psycopg2.connect() <br> cur=conn.cursor() <br> # 遍历 rows,进行 json 反序列化,提取数据并构造 sql 语句,执行 sql 语句将 rows 一次性插入到数据库 <br> <br> for row in rows: <br> v = json.loads(row) <br> insert_sql = "insert into ... values (%s,%s)" % (v[1], v[2]) <br> cur.execute(insert_sql) <br> cur.commit() <br> conn.close() <br> <br>def insert(Rows): <br> # 将 Rows 切割成 100 份,获得 100 个 rows,启用 n 个 do_insert 线程 <br> rows_list = split_list(Rows, 100) <br> pool = threadpool.ThreadPool(n) <br> requests = threadpool.makeRequest(do_insert, rows_list) <br> [pool.putRequest(req) for req in requests] <br> pool.wait() <br> <br>def main(): <br> # 载入 json 文本数据源 <br> # 按行读取,每读取 100 万行数据调用一次 insert() <br> with open('import.json','r') as f: <br> index=0 <br> Rows=[] <br> for line in f: <br> Rows.append(line) <br> index += 1 <br> if index % 1000000 == 0: <br> insert(Rows) <br>
3. 目前尝试了几种参数组合,最终使用的是 10 个线程,每个线程插入 1 万条,每 100 万条做一次多线程批量插入耗时约 2min,平均插入速度约 8400 条 /s,跑完 15 亿条数据大约要 2 天。
python 执行到稳定状态后:占用内存约 1G,cpu 占用率~30%,cpu 线程数持续缓慢上升(似乎没有回收线程)。
总的 CPU 使用率一直保持在~80%,其中 python 只占 30%,另外有大量的 postgres.exe 进程,这部分应该占用了较多的 cpu。硬盘写 io 不固定,峰值 30M/s、其余时间都是 5M/s 以下,速度明显不行。
4. 初步分析
对每个 python 语句的执行时间进行统计,发现主要的业务都在 do_insert 内,也就是具体执行插入操作的线程。不知道跟 json.loads 有无关系,还要进一步验证。但是感觉 python 效率以及程序处理逻辑上还存在缺陷,所以没有去进一步优化。
插入线程越多,postgresql 进程会越多,这样是否会导致 cpu 使用不平衡。
此外,是否还有其他可以优化的地方,无论是从语言上,还是处理逻辑上,还是数据库配置上。
Python中如何高效地将10亿级JSON数据插入PostgreSQL并充分利用单台服务器性能?
之前将几千万条股票数据插入 mongo 大概也是这个思路, 一台办公的笔记本执行程序, mongo 在内网的服务器上, 基本都能达到你这速度.
核心思路: 别用单条INSERT硬怼,用COPY或批量插入,配合连接池和异步IO把硬件跑满。
具体做法:
- 数据准备与分批:
import ijson
import asyncio
from asyncpg import create_pool
import ujson
async def batch_insert():
# 1. 用ijson流式读取大JSON文件,避免内存爆炸
with open('huge_data.json', 'r') as f:
objects = ijson.items(f, 'item') # 根据实际JSON结构调整路径
# 2. 分批处理,每批5000-20000条
batch = []
for obj in objects:
batch.append((
obj['id'],
ujson.dumps(obj['data']), # 如果字段是JSONB
obj['timestamp']
))
if len(batch) >= 10000:
await insert_batch(batch)
batch = []
if batch:
await insert_batch(batch)
async def insert_batch(batch):
# 3. 用asyncpg连接池,控制并发连接数(通常CPU核数*2 + 1)
pool = await create_pool(
user='user', password='pass',
database='db', host='localhost',
min_size=5, max_size=20 # 根据服务器内存调整
)
async with pool.acquire() as conn:
# 4. 使用COPY FROM(最快)
await conn.copy_records_to_table(
'your_table',
records=batch,
columns=['id', 'data', 'timestamp']
)
# 或者用 executemany(次选)
# await conn.executemany(
# "INSERT INTO table VALUES($1, $2, $3)",
# batch
# )
- 关键配置:
- PostgreSQL配置:
max_connections调高,shared_buffers设为内存25%,wal_buffers16-32MB,synchronous_commit = off(导入期间) - 表设置:导入前删索引,导入后重建;用
UNLOGGED表(数据可丢失时)
- 如果JSON是数组文件:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
# 分块读取 + 批量插入
chunk_iter = pd.read_json('data.json', lines=True, chunksize=50000)
engine = create_engine('postgresql://user:pass@localhost/db')
for chunk in chunk_iter:
chunk.to_sql('table', engine, if_exists='append', index=False, method='multi')
一句话总结: 流式读取 + 批量COPY + 连接池 + 调参。
不太了解 Postgresql,但是我之前导入 40G 的 JSON 到 MongoDB,使用了 gevent 的 monkey_patch(),它可以让 pymongo 支持异步,性能比多线程高很多。
[服务器板载 RAID 无法使用,用 windows 的带区卷将两块硬盘组成一个卷,极限 io 也就 170MB/s]
这算软件 RAID0 ?胆子够壮… 这要是纯玩耍环境,无所谓,正式环境…老板知道了,肯定打死你。
机械硬盘 IOPS 很低的,极大限制 DB 操作。
也不是正式的生产环境,算是提供一些数据支撑吧,最后还是用了单块硬盘。
机械硬盘的顺序写速度还是可以的,楼主这种情况基本上就是顺序写。好点的硬盘可以达到几百 M/s,不过随机 IOPS 确实低,这个没办法。
两块盘单独用是非常正确的做法
最起码有 3 种:
1. 用 COPY https://stackoverflow.com/questions/8144002/use-binary-copy-table-from-with-psycopg2/8150329#8150329
2. 多值插入 + 异步 调整参数 synchronous_commit=off 和 commit_delay 降低 fsync 消耗
3. 用 TRANSACTION 把批量插入动作包含在内,一次性提交,跟 2 差不多
copy_from 你值得拥有
以下纯靠 Google 搜索跟已有知识猜的,本人没试过。
搜索以下发现 postgres 有叫 COPY 的命令,感觉会比 insert 快一些…… 大概方案是,先用 python 处理 json 生成适用于 COPY 命令的文件 A,然后为了速度,最好把这个文件放在内存盘中,然后调用 postgres 的 COPY 倒入进去。循环。根据内存大小决定每次处理多少个。
github 上有个项目是帮助导入 csv 和 json 的
另外就是用 postgresql 的 copy 命令
pgfutter 这个工具导入 json 时需要先将 json 整行作为一个单元格导入,然后利用 postgres 对 json 类型的支持,重新插入到一个新表里面。不过这个工具真的挺快的。但因为我们要对 json 做一些简单的过滤,所以流程上还是感觉复杂了点。
先根据条件过滤,分成几个文件,然后用 copy,尽量利用数据库提供的工具,他会绕过一些数据库的转换等,如 oracle 的 sqlldr 就比批量 insert 要快
不会 python 的吧,PIL 的存在,首先就应该考虑多进程。
copy 果然强悍,拿 10G 数据做测试,硬盘 io 平均可以到 60M/s,峰值有 110M/s。
连续读写少量文件还 OK
[硬盘 io 平均可以到 60M/s,峰值有 110M/s。] … 2009 年硬盘就可以达到了吧。
万全 r510。。。。
pg 的话,大规模导入也是用 copy 的接口吧,
另外可以考虑 pg_loader,看看能用不。

