data to HDFS by converting it to a dataframe:
object KafkaSparkHdfs {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
sparkConf.set("spark.driver.allowMultipleContexts", "true");
val sc = new SparkContext(sparkConf)
def main(args: Array[String]): Unit = {
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sparkConf, Seconds(20))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "stream3",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
val topics = Array("fridaydata")
val stream = KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
val lines = stream.map(consumerRecord => consumerRecord.value)
val words = lines.flatMap(_.split(" "))
val wordMap = words.map(word => (word, 1))
val wordCount = wordMap.reduceByKey(_ + _)
wordCount.foreachRDD(rdd => {
val dataframe = rdd.toDF();
The folder is created but the file is not written.
The program is getting terminated with the following error:
18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.lang.Thread.run(Thread.java:748)
18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
In my pom I am using respective dependencies:
- spark-core_2.11
- spark-sql_2.11
- spark-streaming_2.11
- spark-streaming-kafka-0-10_2.11
该错误是由于尝试同时运行多个spark上下文而引起的.将 allowMultipleContexts
设置为true通常用于测试目的,不建议使用.因此,解决您的问题的方法是确保在所有地方都使用相同的 SparkContext
.从代码中我们可以看到,使用 SparkContext
( sc
)创建了一个 SQLContext
很好.但是,在创建 StreamingContext
时不使用它,而是使用 SparkConf
The error is due to trying to run multiple spark contexts at teh same time. Setting allowMultipleContexts
to true is mostly used for testing purposes and it's use is discouraged. The solution to your problem is therefore to make sure that the same SparkContext
is used everywhere. From the code we can see that the SparkContext
) is used to create a SQLContext
which is fine. However, when creating the StreamingContext
it is not used, instead the SparkConf
is used.
换句话说,通过使用 SparkConf
作为参数,将创建一个新的 SparkContext
In other words, by using SparkConf
as parameter a new SparkContext
will be created. Now there are two separate contexts.
这里最简单的解决方案是继续使用与以前相同的上下文.将创建 StreamingContext
The easiest solution here would be to continue using the same context as before. Change the line creating the StreamingContext
val ssc = new StreamingContext(sc, Seconds(20))
注意:在较新版本的Spark(2.0+)中,请使用 SparkSession
.然后可以使用 StreamingContext(spark.sparkContext,...)
Note: In newer versions of Spark (2.0+) use SparkSession
instead. A new streaming context can then be created using StreamingContext(spark.sparkContext, ...)
. It can look as follows:
val spark = SparkSession().builder
import sqlContext.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
