本文介绍了Spark结构化流媒体无法接收kafka消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在测试使用kafka的Spark结构化流式传输.我在 host28 上具有一个 kafka-broker(0.10.1),默认分区编号为: num.partitions= 1

I am testing spark structured streaming use kafka.i have a kafka-broker(0.10.1) on host28,default partition num:num.partitions=1

我的制片人:

bin/kafka-console-producer.sh --broker-list host28:6667 --topic test

当我使用

bin/kafka-console-consumer.sh --zookeeper host26:2181,host27:2181,host28:2181 --topic test --from-beginning

bin/kafka-console-consumer.sh --bootstrap-server host8:6667 --topic test --from-beginning --partition 0

可以接收来自kafka的消息.

can recive message from kafka.

但是使用时

bin/kafka-console-consumer.sh --bootstrap-server host28:6667 --topic test --from-beginning

或Spark结构化的流媒体无法接收消息

or spark structured streaming can't recive message

public class Main {
    private static String APP_NAE = "test_streaming_from_kafka";
    private static String KAFKA_HOST = "host28:6667";
    private static String KAFKA_SUBSCRIBE = "test";
    public static void main(String[] args) throws Exception {

        SparkSession spark = SparkSession
                .builder()
                .appName(APP_NAE)
                .getOrCreate();

        DataStreamReader reader = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", KAFKA_HOST)
                .option("subscribe", KAFKA_SUBSCRIBE);

        StreamingQuery query = reader.load()
                .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "CAST(partition AS STRING)", "CAST(offset AS STRING)")
                .writeStream()
                .format("console")
                .start();

        query.awaitTermination();
    }
}

推荐答案

已解决!

我将Spark日志从 INFO 更改为 DEBUG ,然后我发现了:

I changed spark log from INFO to DEBUG,then i found this:

google 组协调器不可用

这篇关于Spark结构化流媒体无法接收kafka消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 10:17