问题描述
我正在将 SparkStreaming
数据写入HDFS,方法是将其转换为数据帧:
I am writing SparkStreaming
data to HDFS by converting it to a dataframe:
代码
object KafkaSparkHdfs {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
sparkConf.set("spark.driver.allowMultipleContexts", "true");
val sc = new SparkContext(sparkConf)
def main(args: Array[String]): Unit = {
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sparkConf, Seconds(20))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "stream3",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("fridaydata")
val stream = KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
)
val lines = stream.map(consumerRecord => consumerRecord.value)
val words = lines.flatMap(_.split(" "))
val wordMap = words.map(word => (word, 1))
val wordCount = wordMap.reduceByKey(_ + _)
wordCount.foreachRDD(rdd => {
val dataframe = rdd.toDF();
dataframe.write
.mode(SaveMode.Append)
.save("hdfs://localhost:9000/newfile24")
})
ssc.start()
ssc.awaitTermination()
}
}
已创建文件夹,但未写入文件.
The folder is created but the file is not written.
程序因以下错误而终止:
The program is getting terminated with the following error:
18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.lang.Thread.run(Thread.java:748)
18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
在pom中,我正在使用各自的依赖项:
In my pom I am using respective dependencies:
- spark-core_2.11
- spark-sql_2.11
- spark-streaming_2.11
- spark-streaming-kafka-0-10_2.11
推荐答案
该错误是由于尝试同时运行多个spark上下文而引起的.将 allowMultipleContexts
设置为true通常用于测试目的,不建议使用.因此,解决您的问题的方法是确保在所有地方都使用相同的 SparkContext
.从代码中我们可以看到,使用 SparkContext
( sc
)创建了一个 SQLContext
很好.但是,在创建 StreamingContext
时不使用它,而是使用 SparkConf
.
The error is due to trying to run multiple spark contexts at teh same time. Setting allowMultipleContexts
to true is mostly used for testing purposes and it's use is discouraged. The solution to your problem is therefore to make sure that the same SparkContext
is used everywhere. From the code we can see that the SparkContext
(sc
) is used to create a SQLContext
which is fine. However, when creating the StreamingContext
it is not used, instead the SparkConf
is used.
通过查看文档,我们看到:
换句话说,通过使用 SparkConf
作为参数,将创建一个新的 SparkContext
.现在有两个单独的上下文.
In other words, by using SparkConf
as parameter a new SparkContext
will be created. Now there are two separate contexts.
这里最简单的解决方案是继续使用与以前相同的上下文.将创建 StreamingContext
的行更改为:
The easiest solution here would be to continue using the same context as before. Change the line creating the StreamingContext
to:
val ssc = new StreamingContext(sc, Seconds(20))
注意:在较新版本的Spark(2.0+)中,请使用 SparkSession
.然后可以使用 StreamingContext(spark.sparkContext,...)
创建一个新的流上下文.它可以如下所示:
Note: In newer versions of Spark (2.0+) use SparkSession
instead. A new streaming context can then be created using StreamingContext(spark.sparkContext, ...)
. It can look as follows:
val spark = SparkSession().builder
.setMaster("local[*]")
.setAppName("SparkKafka")
.getOrCreate()
import sqlContext.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
这篇关于Spark Streaming异常:java.util.NoSuchElementException:None.get的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!