当我尝试将bulk_data发送到本地elasticsearch时,由于SerializationError导致未加载我的数据。
我已经尝试填充csv文件中的空白单元格,但这不是解决方案。
from elasticsearch import Elasticsearch
bulk_data = []
header = []
count = 0
for row in csv_file_object:
if count > 0 :
data_dict = {}
for i in range(len(row)):
row = row.rstrip()
data_dict[header[i]] = row[i]
op_dict = {
"index": {
"_index": INDEX_NAME,
"_type": TYPE_NAME,
}
}
bulk_data.append(op_dict)
bulk_data.append(data_dict)
else:
header = row
count = count+1
# create ES client, create index
es = Elasticsearch(hosts = [ES_HOST])
if es.indices.exists(INDEX_NAME):
print("deleting '%s' index..." % (INDEX_NAME))
res = es.indices.delete(index = INDEX_NAME)
res = es.bulk(index = INDEX_NAME, body = bulk_data, refresh = True)
请参阅图像以获取SerializationError和bulk_data值:请注意:\ n由序列化过程本身添加。
最佳答案
我试图回复您,但我听不懂一件事。如何从数据中检索字段名称?在您的代码中,我看到您从名为header
的列表中检索到它为空?我不明白您如何看待这个值(value)。.检查我的答案我不知道我是否理解得很好
from elasticsearch import Elasticsearch
from elasticsearch import helpers
index_name = "your_index_name"
doc_type = "your_doc_type"
esConnector = Elasticsearch(["http://192.168.1.1:9200/"])
# change your ip here
count = 0
def generate_data(csv_file_object)
with open(csv_file_object, "r") as f:
for line in f:
line = line.split(",").rstrip()
data_dict = {header[count]: line}
obj={
'_op_type': 'index',
'_index': index_name,
'_type': doc_type,
'_id': count+1,
'_source': data_dict
}
count +=1
yield obj
for success, info in helpers.parallel_bulk(client=esConnector, actions=generate_data(csv_file_object), thread_count=4):
if not success:
print 'Doc failed', info
关于python - CSV转换为带有python SerializationError的elasticsearch,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50002053/