我已经在Kafka中将主题写为my-topic,并且试图在spark中获取主题的信息。但是由于出现的错误列表越来越长,我在显示Kafka主题详细信息时遇到了一些困难。我正在使用Java来获取数据。

下面是我的代码:

public static void main(String s[]) throws InterruptedException{
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Sampleapp");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "Different id is allotted for different stream");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("my-topic");

    final JavaInputDStream<ConsumerRecord<String, String>> stream =
      KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
      );

    JavaPairDStream<String, String> jPairDStream =  stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                /**
                 *
                 */
                private static final long serialVersionUID = 1L;

                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                    return new Tuple2<>(record.key(), record.value());
                }
            });

    jPairDStream.foreachRDD(jPairRDD -> {
           jPairRDD.foreach(rdd -> {
                System.out.println("key="+rdd._1()+" value="+rdd._2());
            });
        });

    jssc.start();
    jssc.awaitTermination();

    stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                /**
                 *
                 */
                private static final long serialVersionUID = 1L;

                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                    return new Tuple2<>(record.key(), record.value());
                }
            });
}


以下是我遇到的错误:


  使用Spark的默认log4j配置文件:
  org / apache / spark / log4j-defaults.properties 17/09/04 11:41:15 INFO
  SparkContext:运行Spark版本2.1.0 17/09/04 11:41:15警告
  NativeCodeLoader:无法为您的本地Hadoop库加载
  平台...在适用的情况下使用内置Java类04/9/17
  11:41:15 INFO SecurityManager:将视图ACL更改为:11014525
  04/9/17 11:41:15 INFO SecurityManager:将修改ACL更改为:
  11014525 17/09/04 11:41:15 INFO SecurityManager:更改视图ACL
  组到:17/09/04 11:41:15 INFO SecurityManager:更改修改
  ACL组到:04/17/09 11:41:15 INFO SecurityManager:
  SecurityManager:身份验证已禁用; ui acls已禁用;使用者
  具有查看权限:Set(11014525);具有查看权限的组:
  组();具有修改权限的用户:Set(11014525);与
  修改权限:Set()04/09/04 11:41:15 INFO实用工具:成功
  在端口56668上启动了服务'sparkDriver'。04/17/09信息
  SparkEnv:注册MapOutputTracker 17/09/04 11:41:15 INFO
  SparkEnv:注册BlockManagerMaster 17/09/04 11:41:15 INFO
  BlockManagerMasterEndpoint:使用
  org.apache.spark.storage.DefaultTopologyMapper用于获取拓扑
  信息17/09/04 11:41:15 INFO BlockManagerMasterEndpoint:
  BlockManagerMasterEndpoint向上17/09/04 11:41:15 INFO DiskBlockManager:
  在以下位置创建本地目录
  C:\ Users \ 11014525 \ AppData \ Local \ Temp \ blockmgr-cba489b9-2458-455a-8c03-4c4395a01d44
  04/9/17 11:41:15信息MemoryStore:MemoryStore开始具有容量
  896.4 MB 17/09/04 11:41:16 INFO SparkEnv:注册OutputCommitCoordinator 17/09/04 11:41:16 INFO实用程序:成功
  在端口4040上启动了服务'SparkUI'。17/09/04 11:41:16 INFO
  SparkUI:将SparkUI绑定到0.0.0.0,并从开始
  http://172.16.202.21:4040 04/09/17 11:41:16 INFO执行程序:正在启动
  主机本地主机上的执行程序ID驱动程序17/09/04 11:41:16 INFO实用程序:
  成功启动服务
  端口上的“ org.apache.spark.network.netty.NettyBlockTransferService”
  56689. 17/09/04 11:41:16 INFO NettyBlockTransferService:服务器创建于172.16.202.21:56689 17/09/04 11:41:16 INFO BlockManager:
  使用org.apache.spark.storage.RandomBlockReplicationPolicy进行阻止
  复制策略17/09/04 11:41:16 INFO BlockManagerMaster:
  注册BlockManager BlockManagerId(驱动程序,172.16.202.21,56689,
  无)17/09/04 11:41:16 INFO BlockManagerMasterEndpoint:正在注册
  块管理器172.16.202.21:56689具有896.4 MB RAM,
  BlockManagerId(驱动程序,172.16.202.21,56689,无)17/09/04 11:41:16
  INFO BlockManagerMaster:已注册的BlockManager
  BlockManagerId(驱动程序,172.16.202.21,56689,无)17/09/04 11:41:16
  信息BlockManager:初始化BlockManager:BlockManagerId(驱动程序,
  172.16.202.21、56689,无)17/09/04 11:41:16警告KafkaUtils:对执行者覆盖enable.auto.commit为false 17/09/04 11:41:16
  WARF KafkaUtils:对于执行程序,将auto.offset.reset覆盖为none
  04/9/17 11:41:16警告KafkaUtils:将执行者group.id覆盖为
  为不同的流分配了spark-executor-Different id 17/09/04
  11:41:16警告KafkaUtils:将receive.buffer.bytes覆盖为65536
  KAFKA-3135 17/09/04 11:41:16 INFO DirectKafkaInputDStream:幻灯片时间
  = 10000毫秒17/09/04 11:41:16 INFO DirectKafkaInputDStream:存储级别=序列化的1x已复制17/09/04 11:41:16 INFO
  DirectKafkaInputDStream:检查点间隔= null 17/09/04 11:41:16
  INFO DirectKafkaInputDStream:记住间隔= 10000毫秒04年9月17日
  11:41:16 INFO DirectKafkaInputDStream:初始化并验证
  org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@23a3407b
  17/09/04 11:41:16 INFO MappedDStream:幻灯片时间= 10000毫秒17/09/04
  11:41:16 INFO MappedDStream:存储级别=序列化1x已复制
  17/09/04 11:41:16 INFO MappedDStream:检查点间隔= null
  17/09/04 11:41:16 INFO MappedDStream:记住间隔= 10000毫秒
  17/09/04 11:41:16 INFO MappedDStream:初始化并验证
  org.apache.spark.streaming.dstream.MappedDStream@140030a9 04/09/17
  11:41:16 INFO ForEachDStream:滑动时间= 10000毫秒04/9/17
  INFO ForEachDStream:存储级别=序列化1x复制17/09/04
  11:41:16 INFO ForEachDStream:检查点间隔=空17/09/04
  11:41:16 INFO ForEachDStream:记住间隔= 10000毫秒04/9/17
  11:41:16 INFO ForEachDStream:初始化并验证
  org.apache.spark.streaming.dstream.ForEachDStream@65041548 04/09/17
  11:41:16错误StreamingContext:启动上下文时出错,标记为
  它已停止org.apache.kafka.common.config.ConfigException:丢失
  所需的配置“ partition.assignment.strategy”,该配置没有
  默认值。在
  org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)在
  org.apache.kafka.common.config.AbstractConfig。(AbstractConfig.java:48)
    在
  org.apache.kafka.clients.consumer.ConsumerConfig。(ConsumerConfig.java:194)
    在
  org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:380)
    在
  org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:363)
    在
  org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:350)
    在
  org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
    在
  org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)在
  org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
    在
  org.apache.spark.streaming.DStreamGraph $$ anonfun $ start $ 5.apply(DStreamGraph.scala:49)
    在
  org.apache.spark.streaming.DStreamGraph $$ anonfun $ start $ 5.apply(DStreamGraph.scala:49)
    在
  scala.collection.parallel.mutable.ParArray $ ParArrayIterator.foreach_quick(ParArray.scala:143)
    在
  scala.collection.parallel.mutable.ParArray $ ParArrayIterator.foreach(ParArray.scala:136)
    在
  scala.collection.parallel.ParIterableLike $ Foreach.leaf(ParIterableLike.scala:972)
    在
  scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply $ mcV $ sp(Tasks.scala:49)
    在
  scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48)
    在
  scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48)
    在scala.collection.parallel.Task $ class.tryLeaf(Tasks.scala:51)在
  scala.collection.parallel.ParIterableLike $ Foreach.tryLeaf(ParIterableLike.scala:969)
    在
  scala.collection.parallel.AdaptiveWorkStealingTasks $ WrappedTask $ class.compute(Tasks.scala:152)
    在
  scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks $ WrappedTask.compute(Tasksscala:443)
    在
  scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    在
  scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    在
  scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)
    在
  scala.current.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    在
  scala.current.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    在...使用org.apache.spark.util.ThreadUtils在单独的线程中运行
  ... () 在
  org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext.scala:578)
    在
  org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
    在
  org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
    在
  Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56)
  17/09/04 11:41:16信息ReceiverTracker:ReceiverTracker已停止
  17/09/04 11:41:16 INFO JobGenerator:立即停止JobGenerator
  17/09/04 11:41:16 INFO RecurringTimer:JobGenerator的停止的计时器
  时间-1 17/09/04后11:41:16 INFO JobGenerator:已停止
  JobGenerator 17/09/04 11:41:16 INFO JobScheduler:停止JobScheduler
  线程“主”中的异常
  org.apache.kafka.common.config.ConfigException:缺少必需项
  没有默认设置的配置“ partition.assignment.strategy”
  值。在
  org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)在
  org.apache.kafka.common.config.AbstractConfig。(AbstractConfig.java:48)
    在
  org.apache.kafka.clients.consumer.ConsumerConfig。(ConsumerConfig.java:194)
    在
  org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:380)
    在
  org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:363)
    在
  org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:350)
    在
  org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
    在
  org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)在
  org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
    在
  org.apache.spark.streaming.DStreamGraph $$ anonfun $ start $ 5.apply(DStreamGraph.scala:49)
    在
  org.apache.spark.streaming.DStreamGraph $$ anonfun $ start $ 5.apply(DStreamGraph.scala:49)
    在
  scala.collection.parallel.mutable.ParArray $ ParArrayIterator.foreach_quick(ParArray.scala:143)
    在
  scala.collection.parallel.mutable.ParArray $ ParArrayIterator.foreach(ParArray.scala:136)
    在
  scala.collection.parallel.ParIterableLike $ Foreach.leaf(ParIterableLike.scala:972)
    在
  scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply $ mcV $ sp(Tasks.scala:49)
    在
  scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48)
    在
  scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48)
    在scala.collection.parallel.Task $ class.tryLeaf(Tasks.scala:51)在
  scala.collection.parallel.ParIterableLike $ Foreach.tryLeaf(ParIterableLike.scala:969)
    在
  scala.collection.parallel.AdaptiveWorkStealingTasks $ WrappedTask $ class.compute(Tasks.scala:152)
    在
  scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks $ WrappedTask.compute(Tasksscala:443)
    在
  scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    在
  scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    在
  scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)
    在
  scala.current.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    在
  scala.current.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    在...使用org.apache.spark.util.ThreadUtils在单独的线程中运行
  ... () 在
  org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext.scala:578)
    在
  org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
    在
  org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
    在
  Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56)
  17/09/04 11:41:16 INFO SparkContext:从关机调用stop()
  挂钩17/09/04 11:41:16信息SparkUI:在以下位置停止了Spark Web UI
  http://172.16.202.21:4040 17/09/04 11:41:16 INFO
  MapOutputTrackerMasterEndpoint:MapOutputTrackerMasterEndpoint
  停了! 17/09/04 11:41:16 INFO MemoryStore:MemoryStore已清除
  17/09/04 11:41:16 INFO BlockManager:BlockManager停止了17/09/04
  11:41:16 INFO BlockManagerMaster:BlockManagerMaster停止了17/09/04
  11:41:16 INFO OutputCommitCoordinator $ OutputCommitCoordinator端点:
  OutputCommitCoordinator已停止! 17/09/04 11:41:16 INFO SparkContext:
  成功停止了SparkContext 17/09/04 11:41:16 INFO
  ShutdownHookManager:关机钩称为17/09/04 11:41:16 INFO
  ShutdownHookManager:删除目录
  C:\ Users \ 11014525 \ AppData \ Local \ Temp \ spark-37334cdc-9680-4801-8e50-ef3024ed1d8a


pom.xml


  
      org.apache.spark
      spark-streaming_2.11
      2.1.0
              
                    公地语言
                    公地语言
                    2.6
              
              
      org.apache.kafka
      kafka_2.10
      0.8.2.0
              org.apache.spark
              spark-streaming-kafka-0-10_2.10
              2.1.1

最佳答案

从日志中,您的Spark版本为2.1.0。您尚未共享具有其他依赖项的构建文件。看起来您在classpath中同时具有spark-streaming-kafka-0-8_2.11-2.1.0.jarspark-streaming-kafka-0-10_2.11-2.1.0.jar,并且正在加载错误的类。如果您使用的是maven,则需要如下所示的依赖项。请检查并更新您的项目。

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.0</version>
</dependency>


编辑

当您编辑问题并发布相关性后,我正在编辑我的答案。您正在使用Kafka版本0.8.*,而spark-streaming-kafka版本为0.10.*。请为Kafka依赖项使用相同版本。请对org.apache.kafka使用以下依赖项

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.2.0</version>
</dependency>

08-24 17:33