我正在尝试从本地计算机(OSX)上的文件夹中流式传输CSV文件。我像这样一起使用SparkSession和StreamingContext:
val sc: SparkContext = createSparkContext(sparkContextName)
val sparkSess = SparkSession.builder().config(sc.getConf).getOrCreate()
val ssc = new StreamingContext(sparkSess.sparkContext, Seconds(time))
val csvSchema = new StructType().add("field_name",StringType)
val inputDF = sparkSess.readStream.format("org.apache.spark.csv").schema(csvSchema).csv("file:///Users/userName/Documents/Notes/MoreNotes/tmpFolder/")
如果在此之后运行
ssc.start()
,则会出现此错误:java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
相反,如果我尝试像这样启动
SparkSession
:inputDF.writeStream.format("console").start()
我得到:
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
显然,我不理解
SparkSession
和StreamingContext
应该如何一起工作。如果摆脱了SparkSession
,则StreamingContext
仅具有textFileStream
,需要在其上施加CSV模式。感谢您对如何使它正常工作的任何澄清。 最佳答案
您不能同时具有Spark会话和Spark上下文。随着Spark 2.0.0的发布,开发人员可以使用一个新的抽象-Spark Session-可以像以前可用的Spark Context一样实例化并调用它。
您仍然可以从spark会话构建器访问spark上下文:
val sparkSess = SparkSession.builder().appName("My App").getOrCreate()
val sc = sparkSess.sparkContext
val ssc = new StreamingContext(sc, Seconds(time))
导致工作失败的另一件事是您正在执行转换,而未调用任何操作。最后应该调用一些动作,例如inputDF.show()