问题描述
我刚刚开始使用Spark Streaming,并且我正在尝试构建一个示例应用程序,该应用程序计算来自Kafka流的单词.尽管它使用sbt package
进行编译,但是当我运行它时,却得到了NoClassDefFoundError
.此帖子似乎有相同的问题,但是解决方案是针对Maven的,而我尚未能够使用sbt复制它.
I have just began with Spark Streaming and I am trying to build a sample application that counts words from a Kafka stream. Although it compiles with sbt package
, when I run it, I get NoClassDefFoundError
. This post seems to have the same problem, but the solution is for Maven and I have not been able to reproduce it with sbt.
KafkaApp.scala
:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val kafkaParams = Map(
"zookeeper.connect" -> "localhost:2181",
"zookeeper.connection.timeout.ms" -> "10000",
"group.id" -> "sparkGroup"
)
val topics = Map(
"test" -> 1
)
// stream of (topic, ImpressionLog)
val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK)
println(s"Number of words: %{messages.count()}")
}
}
build.sbt
:
name := "Simple Project"
version := "1.1"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.1.1",
"org.apache.spark" %% "spark-streaming" % "1.1.1",
"org.apache.spark" %% "spark-streaming-kafka" % "1.1.1"
)
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
然后我将其提交:
bin/spark-submit \
--class "KafkaApp" \
--master local[4] \
target/scala-2.10/simple-project_2.10-1.1.jar
错误:
14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:65077/user/HeartbeatReceiver
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
at KafkaApp$.main(KafkaApp.scala:28)
at KafkaApp.main(KafkaApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
推荐答案
spark-submit不会自动放入包含KafkaUtils的软件包.您需要在项目JAR中添加.为此,您需要使用 sbt组装创建一个包含所有内容的uber-jar.这是一个示例build.sbt.
spark-submit does not automatically put the package containing KafkaUtils. You need to have in your project JAR. For that you need to create an all inclusive uber-jar, using sbt assembly. Here is an example build.sbt .
https://github.com/tdas/spark-streaming -external-projects/blob/master/kafka/build.sbt
显然,您还需要将程序集插件添加到SBT.
You obviously also need to add the assembly plugin to SBT.
https://github.com/tdas/spark-streaming-external -projects/tree/master/kafka/project
这篇关于在Spark流中找不到KafkaUtils类的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!