本文介绍了星火流卡夫卡流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些问题,同时试图从卡夫卡火花流读取。

I'm having some issues while trying to read from kafka with spark streaming.

我的code是:

My code is:

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor")
val ssc = new StreamingContext(sparkConf, Seconds(2))

val kafkaParams = Map[String, String](
  "zookeeper.connect" -> "localhost:2181",
  "group.id" -> "consumergroup",
  "metadata.broker.list" -> "localhost:9092",
  "zookeeper.connection.timeout.ms" -> "10000"
  //"kafka.auto.offset.reset" -> "smallest"
)

val topics = Set("test")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

我previously在端口9092开始在端口2181和卡夫卡服务器0.9.0.0饲养员。
但我得到星火驱动程序出现以下错误:

I previously started zookeeper at port 2181 and Kafka server 0.9.0.0 at port 9092.But I get the following error in the Spark driver:

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)

动物园管理员日志:

Zookeeper log:

[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)

任何暗示?

非常感谢你。

推荐答案

这个问题是有关错误的火花流 - 卡夫卡的版本。

The problem was related the wrong spark-streaming-kafka version.

如documentation

卡夫卡:星火流1.5.2与卡夫卡0.8.2.1兼容

所以,包括

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.2</version>
</dependency>

在我的pom.xml(而不是0.9.0.0版本)解决了这个问题。

in my pom.xml (instead of version 0.9.0.0) solved the issue.

希望这有助于

这篇关于星火流卡夫卡流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 16:59