本文介绍了星火流卡夫卡流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一些问题,同时试图从卡夫卡火花流读取。
I'm having some issues while trying to read from kafka with spark streaming.
我的code是:
My code is:
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "consumergroup",
"metadata.broker.list" -> "localhost:9092",
"zookeeper.connection.timeout.ms" -> "10000"
//"kafka.auto.offset.reset" -> "smallest"
)
val topics = Set("test")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
我previously在端口9092开始在端口2181和卡夫卡服务器0.9.0.0饲养员。
但我得到星火驱动程序出现以下错误:
I previously started zookeeper at port 2181 and Kafka server 0.9.0.0 at port 9092.But I get the following error in the Spark driver:
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
动物园管理员日志:
Zookeeper log:
[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
任何暗示?
非常感谢你。
推荐答案
这个问题是有关错误的火花流 - 卡夫卡的版本。
The problem was related the wrong spark-streaming-kafka version.
如documentation
卡夫卡:星火流1.5.2与卡夫卡0.8.2.1兼容
所以,包括
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
在我的pom.xml(而不是0.9.0.0版本)解决了这个问题。
in my pom.xml (instead of version 0.9.0.0) solved the issue.
希望这有助于
这篇关于星火流卡夫卡流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!