嗨,大家好,我正在与kafka合作> Spark Streaming> Elasticsearch。
但我不让SparkStream将JavaInputDStream JSON传输到elasticsearch。

我的代码:

    SparkConf conf = new SparkConf()
            .setAppName("Streaming")
            .setMaster("local")
            .set("es.nodes","localhost:9200")
            .set("es.index.auto.create","true");
    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, new Duration(5000));
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "exastax");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("loglar");
    JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );

    JavaPairDStream<String, String> finisStream = stream.mapToPair(record -> new Tuple2<>("", record.value()));
    finisStream.print();
    JavaEsSparkStreaming.saveJsonToEs(finisStream,"spark/docs");
    streamingContext.start();
    streamingContext.awaitTermination();


}

JavaEsSparkStreaming.saveJsonToEs(finisStream,“spark / docs”); >> finisStream无法正常运行,因为它不是JavaDStream。
如何转换JavaDStream的?

最佳答案

JavaEsSparkStreaming.saveJsonToEsJavaDStream一起使用
JavaEsSparkStreaming.saveToEsWithMetaJavaPairDStream一起使用

要修复您的代码:

JavaDStream<String> finisStream = stream.map(new Function<Tuple2<String, String>, String>() {
    public String call(Tuple2<String, String> stringStringTuple2) throws Exception {
        return stringStringTuple2._2();
    }
});

JavaEsSparkStreaming.saveJsonToEs(finisStream,"");

09-04 05:33