我在将数据从Spark Streaming(PySpark)索引到Elasticserach时遇到问题。数据的类型为dstream。下面看起来

(u'01B', 0)
(u'1A5', 1)
....

这是我正在使用的 flex 索引:index = clus和type = data
GET /clus/_mapping/data
{
   "clus": {
      "mappings": {
         "data": {
            "properties": {
               "content": {
                  "type": "text"
               }
            }
         }
      }
   }
}

这是我的代码:
ES_HOST = {
    "host" : "localhost",
    "port" : 9200
}

INDEX_NAME = 'clus'
TYPE_NAME = 'data'
ID_FIELD = 'responseID'

# create ES client
es = Elasticsearch(hosts = [ES_HOST])

# some config before sending to elastic
if not es.indices.exists(INDEX_NAME):
    request_body = {
        "settings" : {
            "number_of_shards": 1,
            "number_of_replicas": 0
        }
    }
    res = es.indices.create(index = INDEX_NAME, body = request_body)
es_write_conf = {
        "es.nodes": "localhost",
        "es.port": "9200",
        "es.resource": INDEX_NAME+"/"+TYPE_NAME
    }
sc = SparkContext(appName="PythonStreamingKafka")
    ssc = StreamingContext(sc, 30)

# .....
#loading data to put in elastic : lines4

    lines4.foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile(
        path='-',
        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_write_conf))




    ssc.start()
    ssc.awaitTermination()

这是错误:

最佳答案

您创建索引的方式似乎有误。创建索引时,您需要在请求的mapping中发送body。这是一个工作示例:

from elasticsearch import Elasticsearch

es = Elasticsearch(["http://localhost:9200"])
# create index
index_name = "clus"
index_mapping = {
   "clus": {
      "mappings": {
         "data": {
            "properties": {
               "content": {
                  "type": "text"
               }
            }
         }
      }
   }
}


if not es.indices.exists(index_name):
        res = es.indices.create(index=index_name, body=index_mapping)
        print res

您应该将此{u'acknowledged': True}作为答复,以确认您已创建索引。

然后,使用foreachRDD遍历数据dstream,并应用一个函数,该函数会将数据转换为json结构{"content": str((u'1A5', 1))}并对其进行索引,如下所示
doc = {"content": str((u'1A5', 1))}
res = es.index(index="clus", doc_type='data', body=doc)

附带说明一下,不建议将数据作为列表(u'1A5', 1)进行索引,否则您将很难在其他上下文中使用它,例如在kibana上进行可视化。

09-19 05:57