简介
sparkStream官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html#overview
sparkStream是构建在spark core之上的实时流处理框架,它支持很多的数据源,如:
你可以从kafka等各种数据源中实时获取数据流,然后经过spark计算,持久化或者实时的dashBoard展示。
sparkStream的实时计算其实也可以称为微批处理计算,它将数据流按照一定的时间段分割成小批的数据,然后将对数据流的操作转换为对RDD的操作,整个流计算的中间结果进行叠加存储到内存或者外部设备,如图:
代码示例
下面将使用tcp socket作为数据源,每隔1秒钟发送字符数据。sparkstream将在启动以后,将收集10秒的数据作为一个批数据进行统计处理,代码如下:
import java.net.ServerSocket import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext} /**
* @Description sparkStream demo
* @Author lay
* @Date 2018/12/08 21:43
*/
object SparkStreamDemo {
var conf: SparkConf = _
var sc: SparkContext = _
var ssc: StreamingContext = _ def init(): Unit = {
conf = new SparkConf().setAppName("spark stream demo").setMaster("local[2]")
sc = new SparkContext(conf)
sc.setLogLevel("warn")
// 时间片为10秒钟
ssc = new StreamingContext(sc, Seconds(10))
} def main(args: Array[String]): Unit = {
// 初始化socket流
initSocketStream()
// 初始化SparkStream
init()
// 从socket获取DStream
val lines = ssc.socketTextStream("localhost", 8888)
// 统计字数
val wordCount = lines.flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_)
// 打印结果
wordCount.print()
// 启动
ssc.start()
println("spark stream started")
} def initSocketStream(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
val serverSocket = new ServerSocket(8888)
val socket = serverSocket.accept()
println("accepted")
for (i <- 1 to 10) {
val text = "what is this\n"
socket.getOutputStream.write(text.getBytes("utf-8"))
Thread.sleep(1000)
}
println("waiting")
Thread.sleep(50000)
socket.close()
serverSocket.close()
println("closed")
}
}).start()
println("thread started")
}
}
注意:
1)这里的master设置为"local[2]",是因为spark起码需要两个线程,一个线程用来接收数据,另一个线程用来处理数据。
2)"what is this\n"这里加了一个'\n'字符,是因为字节流的接收将会以这个字符作为分隔符。
你会看到类似如下的打印:
-------------------------------------------
Time: 1544281700000 ms
-------------------------------------------
(this,10)
(is,10)
(what,10)