我已经在Kafka中将主题写为my-topic
,并且试图在spark中获取主题的信息。但是由于出现的错误列表越来越长,我在显示Kafka主题详细信息时遇到了一些困难。我正在使用Java来获取数据。
下面是我的代码:
public static void main(String s[]) throws InterruptedException{
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Sampleapp");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "Different id is allotted for different stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("my-topic");
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaPairDStream<String, String> jPairDStream = stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
return new Tuple2<>(record.key(), record.value());
}
});
jPairDStream.foreachRDD(jPairRDD -> {
jPairRDD.foreach(rdd -> {
System.out.println("key="+rdd._1()+" value="+rdd._2());
});
});
jssc.start();
jssc.awaitTermination();
stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
return new Tuple2<>(record.key(), record.value());
}
});
}
以下是我遇到的错误:
使用Spark的默认log4j配置文件:
org / apache / spark / log4j-defaults.properties 17/09/04 11:41:15 INFO
SparkContext:运行Spark版本2.1.0 17/09/04 11:41:15警告
NativeCodeLoader:无法为您的本地Hadoop库加载
平台...在适用的情况下使用内置Java类04/9/17
11:41:15 INFO SecurityManager:将视图ACL更改为:11014525
04/9/17 11:41:15 INFO SecurityManager:将修改ACL更改为:
11014525 17/09/04 11:41:15 INFO SecurityManager:更改视图ACL
组到:17/09/04 11:41:15 INFO SecurityManager:更改修改
ACL组到:04/17/09 11:41:15 INFO SecurityManager:
SecurityManager:身份验证已禁用; ui acls已禁用;使用者
具有查看权限:Set(11014525);具有查看权限的组:
组();具有修改权限的用户:Set(11014525);与
修改权限:Set()04/09/04 11:41:15 INFO实用工具:成功
在端口56668上启动了服务'sparkDriver'。04/17/09信息
SparkEnv:注册MapOutputTracker 17/09/04 11:41:15 INFO
SparkEnv:注册BlockManagerMaster 17/09/04 11:41:15 INFO
BlockManagerMasterEndpoint:使用
org.apache.spark.storage.DefaultTopologyMapper用于获取拓扑
信息17/09/04 11:41:15 INFO BlockManagerMasterEndpoint:
BlockManagerMasterEndpoint向上17/09/04 11:41:15 INFO DiskBlockManager:
在以下位置创建本地目录
C:\ Users \ 11014525 \ AppData \ Local \ Temp \ blockmgr-cba489b9-2458-455a-8c03-4c4395a01d44
04/9/17 11:41:15信息MemoryStore:MemoryStore开始具有容量
896.4 MB 17/09/04 11:41:16 INFO SparkEnv:注册OutputCommitCoordinator 17/09/04 11:41:16 INFO实用程序:成功
在端口4040上启动了服务'SparkUI'。17/09/04 11:41:16 INFO
SparkUI:将SparkUI绑定到0.0.0.0,并从开始
http://172.16.202.21:4040 04/09/17 11:41:16 INFO执行程序:正在启动
主机本地主机上的执行程序ID驱动程序17/09/04 11:41:16 INFO实用程序:
成功启动服务
端口上的“ org.apache.spark.network.netty.NettyBlockTransferService”
56689. 17/09/04 11:41:16 INFO NettyBlockTransferService:服务器创建于172.16.202.21:56689 17/09/04 11:41:16 INFO BlockManager:
使用org.apache.spark.storage.RandomBlockReplicationPolicy进行阻止
复制策略17/09/04 11:41:16 INFO BlockManagerMaster:
注册BlockManager BlockManagerId(驱动程序,172.16.202.21,56689,
无)17/09/04 11:41:16 INFO BlockManagerMasterEndpoint:正在注册
块管理器172.16.202.21:56689具有896.4 MB RAM,
BlockManagerId(驱动程序,172.16.202.21,56689,无)17/09/04 11:41:16
INFO BlockManagerMaster:已注册的BlockManager
BlockManagerId(驱动程序,172.16.202.21,56689,无)17/09/04 11:41:16
信息BlockManager:初始化BlockManager:BlockManagerId(驱动程序,
172.16.202.21、56689,无)17/09/04 11:41:16警告KafkaUtils:对执行者覆盖enable.auto.commit为false 17/09/04 11:41:16
WARF KafkaUtils:对于执行程序,将auto.offset.reset覆盖为none
04/9/17 11:41:16警告KafkaUtils:将执行者group.id覆盖为
为不同的流分配了spark-executor-Different id 17/09/04
11:41:16警告KafkaUtils:将receive.buffer.bytes覆盖为65536
KAFKA-3135 17/09/04 11:41:16 INFO DirectKafkaInputDStream:幻灯片时间
= 10000毫秒17/09/04 11:41:16 INFO DirectKafkaInputDStream:存储级别=序列化的1x已复制17/09/04 11:41:16 INFO
DirectKafkaInputDStream:检查点间隔= null 17/09/04 11:41:16
INFO DirectKafkaInputDStream:记住间隔= 10000毫秒04年9月17日
11:41:16 INFO DirectKafkaInputDStream:初始化并验证
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@23a3407b
17/09/04 11:41:16 INFO MappedDStream:幻灯片时间= 10000毫秒17/09/04
11:41:16 INFO MappedDStream:存储级别=序列化1x已复制
17/09/04 11:41:16 INFO MappedDStream:检查点间隔= null
17/09/04 11:41:16 INFO MappedDStream:记住间隔= 10000毫秒
17/09/04 11:41:16 INFO MappedDStream:初始化并验证
org.apache.spark.streaming.dstream.MappedDStream@140030a9 04/09/17
11:41:16 INFO ForEachDStream:滑动时间= 10000毫秒04/9/17
INFO ForEachDStream:存储级别=序列化1x复制17/09/04
11:41:16 INFO ForEachDStream:检查点间隔=空17/09/04
11:41:16 INFO ForEachDStream:记住间隔= 10000毫秒04/9/17
11:41:16 INFO ForEachDStream:初始化并验证
org.apache.spark.streaming.dstream.ForEachDStream@65041548 04/09/17
11:41:16错误StreamingContext:启动上下文时出错,标记为
它已停止org.apache.kafka.common.config.ConfigException:丢失
所需的配置“ partition.assignment.strategy”,该配置没有
默认值。在
org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)在
org.apache.kafka.common.config.AbstractConfig。(AbstractConfig.java:48)
在
org.apache.kafka.clients.consumer.ConsumerConfig。(ConsumerConfig.java:194)
在
org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:380)
在
org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:363)
在
org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:350)
在
org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
在
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)在
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
在
org.apache.spark.streaming.DStreamGraph $$ anonfun $ start $ 5.apply(DStreamGraph.scala:49)
在
org.apache.spark.streaming.DStreamGraph $$ anonfun $ start $ 5.apply(DStreamGraph.scala:49)
在
scala.collection.parallel.mutable.ParArray $ ParArrayIterator.foreach_quick(ParArray.scala:143)
在
scala.collection.parallel.mutable.ParArray $ ParArrayIterator.foreach(ParArray.scala:136)
在
scala.collection.parallel.ParIterableLike $ Foreach.leaf(ParIterableLike.scala:972)
在
scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply $ mcV $ sp(Tasks.scala:49)
在
scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48)
在
scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48)
在scala.collection.parallel.Task $ class.tryLeaf(Tasks.scala:51)在
scala.collection.parallel.ParIterableLike $ Foreach.tryLeaf(ParIterableLike.scala:969)
在
scala.collection.parallel.AdaptiveWorkStealingTasks $ WrappedTask $ class.compute(Tasks.scala:152)
在
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks $ WrappedTask.compute(Tasksscala:443)
在
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
在
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
在
scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)
在
scala.current.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在
scala.current.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
在...使用org.apache.spark.util.ThreadUtils在单独的线程中运行
... () 在
org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext.scala:578)
在
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
在
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
在
Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56)
17/09/04 11:41:16信息ReceiverTracker:ReceiverTracker已停止
17/09/04 11:41:16 INFO JobGenerator:立即停止JobGenerator
17/09/04 11:41:16 INFO RecurringTimer:JobGenerator的停止的计时器
时间-1 17/09/04后11:41:16 INFO JobGenerator:已停止
JobGenerator 17/09/04 11:41:16 INFO JobScheduler:停止JobScheduler
线程“主”中的异常
org.apache.kafka.common.config.ConfigException:缺少必需项
没有默认设置的配置“ partition.assignment.strategy”
值。在
org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)在
org.apache.kafka.common.config.AbstractConfig。(AbstractConfig.java:48)
在
org.apache.kafka.clients.consumer.ConsumerConfig。(ConsumerConfig.java:194)
在
org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:380)
在
org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:363)
在
org.apache.kafka.clients.consumer.KafkaConsumer。(KafkaConsumer.java:350)
在
org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
在
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)在
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
在
org.apache.spark.streaming.DStreamGraph $$ anonfun $ start $ 5.apply(DStreamGraph.scala:49)
在
org.apache.spark.streaming.DStreamGraph $$ anonfun $ start $ 5.apply(DStreamGraph.scala:49)
在
scala.collection.parallel.mutable.ParArray $ ParArrayIterator.foreach_quick(ParArray.scala:143)
在
scala.collection.parallel.mutable.ParArray $ ParArrayIterator.foreach(ParArray.scala:136)
在
scala.collection.parallel.ParIterableLike $ Foreach.leaf(ParIterableLike.scala:972)
在
scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply $ mcV $ sp(Tasks.scala:49)
在
scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48)
在
scala.collection.parallel.Task $$ anonfun $ tryLeaf $ 1.apply(Tasks.scala:48)
在scala.collection.parallel.Task $ class.tryLeaf(Tasks.scala:51)在
scala.collection.parallel.ParIterableLike $ Foreach.tryLeaf(ParIterableLike.scala:969)
在
scala.collection.parallel.AdaptiveWorkStealingTasks $ WrappedTask $ class.compute(Tasks.scala:152)
在
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks $ WrappedTask.compute(Tasksscala:443)
在
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
在
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
在
scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)
在
scala.current.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在
scala.current.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
在...使用org.apache.spark.util.ThreadUtils在单独的线程中运行
... () 在
org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext.scala:578)
在
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
在
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
在
Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56)
17/09/04 11:41:16 INFO SparkContext:从关机调用stop()
挂钩17/09/04 11:41:16信息SparkUI:在以下位置停止了Spark Web UI
http://172.16.202.21:4040 17/09/04 11:41:16 INFO
MapOutputTrackerMasterEndpoint:MapOutputTrackerMasterEndpoint
停了! 17/09/04 11:41:16 INFO MemoryStore:MemoryStore已清除
17/09/04 11:41:16 INFO BlockManager:BlockManager停止了17/09/04
11:41:16 INFO BlockManagerMaster:BlockManagerMaster停止了17/09/04
11:41:16 INFO OutputCommitCoordinator $ OutputCommitCoordinator端点:
OutputCommitCoordinator已停止! 17/09/04 11:41:16 INFO SparkContext:
成功停止了SparkContext 17/09/04 11:41:16 INFO
ShutdownHookManager:关机钩称为17/09/04 11:41:16 INFO
ShutdownHookManager:删除目录
C:\ Users \ 11014525 \ AppData \ Local \ Temp \ spark-37334cdc-9680-4801-8e50-ef3024ed1d8a
pom.xml
org.apache.spark
spark-streaming_2.11
2.1.0
公地语言
公地语言
2.6
org.apache.kafka
kafka_2.10
0.8.2.0
org.apache.spark
spark-streaming-kafka-0-10_2.10
2.1.1
最佳答案
从日志中,您的Spark版本为2.1.0。您尚未共享具有其他依赖项的构建文件。看起来您在classpath中同时具有spark-streaming-kafka-0-8_2.11-2.1.0.jar
和spark-streaming-kafka-0-10_2.11-2.1.0.jar
,并且正在加载错误的类。如果您使用的是maven,则需要如下所示的依赖项。请检查并更新您的项目。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
编辑
当您编辑问题并发布相关性后,我正在编辑我的答案。您正在使用Kafka版本
0.8.*
,而spark-streaming-kafka版本为0.10.*
。请为Kafka依赖项使用相同版本。请对org.apache.kafka
使用以下依赖项<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>