当我们uisng在Apache中Saprk

当我们uisng在Apache中Saprk

本文介绍了找不到设置领袖([TOPICNNAME,0))当我们uisng在Apache中Saprk的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用Apache 1.5.1星火和kafka_2.10-0.8.2.1和卡夫卡DirectStream API从卡夫卡用星火获取数据。

We are using the Apache Spark 1.5.1 and kafka_2.10-0.8.2.1 and Kafka DirectStream API to fetch data from Kafka using Spark.

我们创造了卡夫卡的主题具有以下设置

We created the topics in Kafka with the following settings

ReplicationFactor:1和副本:1

ReplicationFactor :1 and Replica : 1

在所有的卡夫卡实例正在运行,星火就业工作正常。当群集中的卡夫卡实例之一是向下,然而,我们得到以下再现的异常。一段时间后,重新启动我们残疾人卡夫卡的实例,并试图完成星火工作,但火花已经终止,因为异常的。由于这个原因,我们不能在卡夫卡主题读取剩余消息

When all of the Kafka instances are running, the Spark job works fine. When one of the Kafka instances in the cluster is down, however, we get the exception reproduced below. After some time, we restarted the disabled Kafka instance and tried to finish the Spark job, but Spark was had already terminated because of the exception. Because of this, we could not read the remaining messages in the Kafka topics.

ERROR DirectKafkaInputDStream:125 - ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([normalized-tenant4,0]))
ERROR JobScheduler:96 - Error generating jobs for time 1447929990000 ms
org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([normalized-tenant4,0]))
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

在此先感谢。请帮助解决这个问题。

Thanks in advance. Please help to resolve this issue.

推荐答案

这是预期的行为。您已要求每个主题设置ReplicationFactor一个存储在一台机器上。当一台机器出现这种情况存储的话题归-tenant4被取下来,消费者无法找到主题的领导者。

This is expected behaviour. You have requested that each topic be stored on one machine by setting ReplicationFactor to one. When the one machine that happens to store the topic normalized-tenant4 is taken down, the consumer cannot find the leader of the topic.

请参阅

这篇关于找不到设置领袖([TOPICNNAME,0))当我们uisng在Apache中Saprk的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 16:59