嗨,大家好,我正在与kafka合作> Spark Streaming> Elasticsearch。
但我不让SparkStream将JavaInputDStream JSON传输到elasticsearch。
我的代码:
SparkConf conf = new SparkConf()
.setAppName("Streaming")
.setMaster("local")
.set("es.nodes","localhost:9200")
.set("es.index.auto.create","true");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, new Duration(5000));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "exastax");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("loglar");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaPairDStream<String, String> finisStream = stream.mapToPair(record -> new Tuple2<>("", record.value()));
finisStream.print();
JavaEsSparkStreaming.saveJsonToEs(finisStream,"spark/docs");
streamingContext.start();
streamingContext.awaitTermination();
}
JavaEsSparkStreaming.saveJsonToEs(finisStream,“spark / docs”); >> finisStream无法正常运行,因为它不是JavaDStream。
如何转换JavaDStream的?
最佳答案
JavaEsSparkStreaming.saveJsonToEs
与JavaDStream
一起使用JavaEsSparkStreaming.saveToEsWithMeta
与JavaPairDStream
一起使用
要修复您的代码:
JavaDStream<String> finisStream = stream.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> stringStringTuple2) throws Exception {
return stringStringTuple2._2();
}
});
JavaEsSparkStreaming.saveJsonToEs(finisStream,"");