Python调用Elasticsearch的bulk接口批量插入数据时出现内存泄漏导致OOM问题如何解决
数据导入脚本如下
import time
import sys
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
reload(sys)
sys.setdefaultencoding(‘utf-8’)
def set_mapping(es, index_name = “content_engine”, doc_type_name = “en”):
my_mapping = {
“en”: {
“properties”: {
“a”: {
“type”: “string”
},
“b”: {
“type”: “string”
}
}
}
}
create_index = es.indices.create(index = index_name,body = my_mapping)
mapping_index = es.indices.put_mapping(index = index_name, doc_type = doc_type_name, body = my_mapping)
if create_index[“acknowledged”] != True or mapping_index[“acknowledged”] != True:
print “Index creation failed…”
def set_data(es, input_file, index_name = “content_engine”, doc_type_name=“en”):
i = 0
count = 0
ACTIONS = []
for line in open(input_file):
fields = line.replace("\r\n", “”).replace("\n", “”).split("----")
if len(fields) == 2:
a, b = fields
else:
continue
action = {
“_index”: index_name,
“_type”: doc_type_name,
“_source”: {
“a”: a,
“b”: b,
}
}
i += 1
ACTIONS.append(action)
if (i == 500000):
success, _ = bulk(es, ACTIONS, index = index_name, raise_on_error = True)
count += success
i = 0
ACTIONS = []
success, _ = bulk(es, ACTIONS, index = index_name, raise_on_error=True)
count += success
print("insert %s lines" % count)
if name == ‘main’:
es = Elasticsearch(hosts=[“127.0.0.1:9200”], timeout=5000)
set_mapping(es)
set_data(es,sys.argv[1])
数据大概 5 个 G 吧,机器配置虚拟机 24G 内存,刚开始无内存泄露现象,这个 Python 脚本的进程内存一直保持 1G 左右的占用,当插入 1600 w,内存开始持续飙升,最后达到 22G ,导致触发 OOM 机制, Python 进程被内核 kill ,差点怀疑人生。。大家在遇到 Python 内存泄露都是怎么定位的?
Python调用Elasticsearch的bulk接口批量插入数据时出现内存泄漏导致OOM问题如何解决
1 、 gc
2 、 objgraph
5w bulk 一次,再不行重新建立下 es 对象试试
没有人对你这么烂的代码感兴趣,这是事实,必须承认.
试试,找个同事或者同学,然后口述你代码逻辑,也许你会自己发现问题~
你要是发现这代码哪里导致的内存泄露,就说出来,我承认我是渣渣没问题的。
其实我本意不是说你代码烂.
内存泄露一般出现在循环里面向循环外的容器塞数据,导致内存泄露.
你代码里的 ACTIONS 变量,在循环里面每次都塞一些数据,然后直到函数结束才释放.
也就是说, ACTIONS 里面包含整个文件的数据?
5G 的文件啊,哥.
参考这里: https://github.com/elastic/elasticsearch-py/issues/297
1.试试用 generator 改写,
2.因为 bulk 调用 streaming_bulk ,试试调整 chunk_size 、 max_chunk_bytes : http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.streaming_bulk
我试过减少 bluk 到 5w ,内存依然炸裂的
我是进程运行一段时间之后产生的内存泄露,有啥工具可以注入 Python 进程查看 gc 情况吗?
晚上回去试试。
官网给的推荐是 1,000 to 5,000 条数据,文件大小是 5-15MB , https://www.elastic.co/guide/en/elasticsearch/guide/master/bulk.html
有个思路是用 linux 的切割命令: split -l 5000 input_file
再就是用多线程进行批量导入,线程数量最好是 200 个左右
有个思路是用 linux 的切割命令: split -l 5000 input_file
再就是用多线程对分割的文件 进行批量导入,线程数量最好是 200 个左右
没用过 python es 的库,但是看你的代码,如果 es 存了 ACTIONS 这个 list 的引用,有可能有内存泄露。把 ACTIONS = []改成 del ACTIONS[:]试下?
嗯,我看了你的链接,官方的意思是推荐从一次导入 1000-5000 条开始测试直到找到最佳 performance 吧, 可能我的不是最佳,但是和这个应该没有关系,分割为小文件我导入我想过(现在我朋友推荐我使用 Java 的 API 用 9300 端口走 TCP 导入),但是我其实想找到内存泄露的原因呢。
试过了,依然 oom ,我还试过 del 之后用 gc 库显示回收 gc ,也是炸裂。
结帖了,在 github 提了[issue]( https://github.com/elastic/elasticsearch-py/issues/508),是我姿势不对。。
虽然已结贴,但是我还想问下,如果把值调成 5000 ,会出现内存泄露不?因为看了下 github 上的生成器,给我的感觉是一次性导入数据,不知道我有没有看错,如果这样的话,效率会比较低吧。
可以在内存飙升的时候看看具体是消耗在哪了。
貌似有 guppy 之类的工具可用?
晚上我测试完了给你结果,我觉得还是会泄露, github 那个它说 bluk 内部有 chunking ,默认好像是 chunking size 是 5000 吧,理解为 5000 个 documents 请求一次 es 的 API 就行。
我取 stackoverflow 提问,有人推荐 pypi.python.org/pypi/memory_profiler ,但是我这个情况还是不适用。
哥,您那个改成生成器的方式肿么写的?能贴个代码出来么?感激不尽。。。


