StructuredStreaming简单的例子(NewAPI)(wordCount)
package com.briup.streaming.structed import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode object SocketSourceMyTest {
def main(args: Array[String]): Unit = {
//设置Logger日志级别
Logger.getLogger("org").setLevel(Level.WARN) //1 类似SparkSql构建过程,需要SparkSession对象
val spark = SparkSession.builder().master("local[*]").appName("SocketSourceMyTest").getOrCreate()
import spark.implicits._ //2 从某个数据源获取数据
val df = spark.readStream.format("socket")
.option("host", "localhost")
.option("port", 9999)
.option("includeTimestamp", true)
.load() //3 数据处理
// 必须 df ----> df.writeStream.start()
//
val w_c = df.flatMap(row =>
row.getAs[String]("value").split(" ")
.map(word => (word,1))
)
val res1 = w_c.toDF("word","number").groupBy("word").sum("number") //4 声明开始执行任务(开启任务)
val query1 = res1.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.start()
query1.awaitTermination()
spark.close() }
}