本文介绍了使用Spark Streaming结构化连接到Zookeeper时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当尝试从 Kafka 主题中读取时,我的 Spark Streaming结构化一直与Zookeeper断开连接:

My Spark Streaming Structured keeps disconnecting from Zookeeper when trying to read from a Kafkatopic:

WARN clients.NetworkClient: Bootstrap broker [zk host]:2181 disconnected

当我检查ZK日志时,总是看到此错误提示:

When I check the ZK logs, I see this error being prompted all the time:

Exception causing close of session 0x0 due to java.io.EOFException

我在带有Spark 2.1的Cloudera 5.11上运行,这些是我的SBT库:

I´m running on Cloudera 5.11 with Spark 2.1, these are my SBT libraries:

val sparkVer = "2.1.0"
Seq(
 "org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
 "org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
 "org.apache.spark" %% "spark-sql" % sparkVer % "provided",
 "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % sparkVer

)

这是我的提交命令:

# Set KAFKA to 0.10 see (https://community.cloudera.com/t5/Data-Ingestion-Integration/KafkaConsumer-subscribe-0-9-vs-0-10-in-Structured-streaming/td-p/60161)
export SPARK_KAFKA_VERSION=0.10
spark2-submit  --class myMainClass --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 myapp.jar topic2345 [zk host 1]:2181,[zk host 2]:2181

这是创建流的代码:

  private def createKafkaStrem(spark: SparkSession, args: Array[String]) = {
     spark.readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", args(1))
     .option("subscribe", args(0))
     .load()
 }

激活调试输出后,这是完整的错误堆栈:

After activating the DEBUG ouput, this is the complete error stack:

java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:974)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$org$apache$spark$sql$kafka010$KafkaSource$$fetchLatestOffsets$1.apply(KafkaSource.scala:374)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$org$apache$spark$sql$kafka010$KafkaSource$$fetchLatestOffsets$1.apply(KafkaSource.scala:372)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaSource.scala:442)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$withRetriesWithoutInterrupt$1.apply(KafkaSource.scala:441)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$withRetriesWithoutInterrupt$1.apply(KafkaSource.scala:441)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:79)
at org.apache.spark.sql.kafka010.KafkaSource.withRetriesWithoutInterrupt(KafkaSource.scala:440)
at org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$fetchLatestOffsets(KafkaSource.scala:372)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$initialPartitionOffsets$1.apply(KafkaSource.scala:141)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$initialPartitionOffsets$1.apply(KafkaSource.scala:138)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:138)
at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets(KafkaSource.scala:121)
at org.apache.spark.sql.kafka010.KafkaSource.getOffset(KafkaSource.scala:157)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$9$$anonfun$apply$5.apply(StreamExecution.scala:391)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$9$$anonfun$apply$5.apply(StreamExecution.scala:391)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:265)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$9.apply(StreamExecution.scala:390)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$9.apply(StreamExecution.scala:388)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:388)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:362)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:260)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:257)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:257)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:265)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:257)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:187)
18/03/21 10:47:27 DEBUG clients.NetworkClient: Node -2 disconnected.
18/03/21 10:47:27 WARN clients.NetworkClient: Bootstrap broker [zk host]:2181 disconnected
18/03/21 10:47:27 DEBUG clients.NetworkClient: Sending metadata request {topics=[topic2345]} to node -1
18/03/21 10:47:27 DEBUG network.Selector: Connection with /[zk host] disconnected

推荐答案

kafka.bootstrap.servers 列出了Kafka代理列表,而不是Zookeeper仲裁.

kafka.bootstrap.servers takes a list of Kafka brokers, not a Zookeeper quorum.

新" Kafka Consumer API不使用Zookeeper连接字符串

The "new" Kafka Consumer API does not use a Zookeeper connection string

这篇关于使用Spark Streaming结构化连接到Zookeeper时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:06