1、简介
Spark Streaming处理的数据流图:
Spark Streaming在内部的处理机制是,接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处理这些批数据,最终得到处理后的一批批结果数据。
对应的批数据,在Spark内核对应一个RDD实例,因此,对应流数据的DStream可以看成是一组RDDs,即RDD的一个序列。通俗点理解的话,在流数据分成一批一批后,通过一个先进先出的队列,然后 Spark Engine从该队列中依次取出一个个批数据,把批数据封装成一个RDD,然后进行处理,这是一个典型的生产者消费者模型。
1.2 术语定义
l离散流(discretized stream)或DStream:Spark Streaming对内部持续的实时数据流的抽象描述,即我们处理的一个实时数据流,在Spark Streaming中对应于一个DStream 实例。
l批数据(batch data):这是化整为零的第一步,将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着持续时间的推移,这些处理结果就形成了对应的结果数据流了。
l时间片或批处理时间间隔( batch interval):人为地对流数据进行定量的标准,以时间片作为我们拆分流数据的依据。一个时间片的数据对应一个RDD实例。
l窗口长度(window length):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数,
l滑动时间间隔:前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数
lInput DStream :一个input DStream是一个特殊的DStream,将Spark Streaming连接到一个外部数据源来读取数据。
2、运行原理
2.1 Streaming架构
SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。
l计算流程:Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备。下图显示了Spark Streaming的整个流程。
图Spark Streaming构架
2.2 编程模型
DStream(Discretized Stream)作为Spark Streaming的基础抽象,它代表持续性的数据流。这些数据流既可以通过外部输入源赖获取,也可以通过现有的Dstream的transformation操作来获得。在内部实现上,DStream由一组时间序列上连续的RDD来表示。每个RDD都包含了自己特定时间间隔内的数据流。如图7-3所示。
图7-3 DStream中在时间轴下生成离散的RDD序列
对DStream中数据的各种操作也是映射到内部的RDD上来进行的,如图7-4所示,对Dtream的操作可以通过RDD的transformation生成新的DStream。这里的执行引擎是Spark。
2.2.1 如何使用Spark Streaming
"""
Counts words in UTF8 encoded, '\n' delimited text directly received from Kafka in every seconds.
Usage: direct_kafka_wordcount.py <broker_list> <topic>
To run this on your local machine, you need to setup Kafka and create a producer first, see
http://kafka.apache.org/documentation.html#quickstart
and then run the example
`$ bin/spark-submit --jars \
external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \
examples/src/main/python/streaming/direct_kafka_wordcount.py \
localhost: test`
"""
from __future__ import print_function import sys from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils if __name__ == "__main__":
if len(sys.argv) != :
print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
exit(-) sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, ) brokers, topic = sys.argv[:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
#这里kafka产生的是一个map, key是null, value是实际发送的数据,所以取x[1]
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, )) \
.reduceByKey(lambda a, b: a+b)
counts.pprint() ssc.start()
ssc.awaitTermination()
1.创建StreamingContext对象 同Spark初始化需要创建SparkContext对象一样,使用Spark Streaming就需要创建StreamingContext对象。创建StreamingContext对象所需的参数与SparkContext基本一致,包括指明Master,设定名称。Spark Streaming需要指定处理数据的时间间隔,如上例所示的2s,那么Spark Streaming会以2s为时间窗口进行数据处理。此参数需要根据用户的需求和集群的处理能力进行适当的设置;
2.创建InputDStream Spark Streaming需要指明数据源。如socketTextStream,Spark Streaming以socket连接作为数据源读取数据。当然Spark Streaming支持多种不同的数据源,包括Kafka、 Flume、HDFS/S3、Kinesis和Twitter等数据源;
3.操作DStream 对于从数据源得到的DStream,用户可以在其基础上进行各种操作,如上例所示的操作就是一个典型的WordCount执行流程:对于当前时间窗口内从数据源得到的数据首先进行分割,然后利用Map和ReduceByKey方法进行计算,当然最后还有使用print()方法输出结果;
4.启动Spark Streaming 之前所作的所有步骤只是创建了执行流程,程序没有真正连接上数据源,也没有对数据进行任何操作,只是设定好了所有的执行计划,当ssc.start()启动后程序才真正进行所有预期的操作。
至此对于Spark Streaming的如何使用有了一个大概的印象,在后面的章节我们会通过源代码深入探究一下Spark Streaming的执行流程
2.2.3 DStream的操作
与RDD类似,DStream也提供了自己的一系列操作方法,这些操作可以分成三类:普通的转换操作、窗口转换操作和输出操作。
2.2.3.1 普通的转换操作
普通的转换操作如下表所示:
转换 | 描述 |
map(func) | 源 DStream的每个元素通过函数func返回一个新的DStream。 |
flatMap(func) | 类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素。 |
filter(func) | 在源DSTREAM上选择Func函数返回仅为true的元素,最终返回一个新的DSTREAM 。 |
repartition(numPartitions) | 通过输入的参数numPartitions的值来改变DStream的分区大小。 |
union(otherStream) | 返回一个包含源DStream与其他 DStream的元素合并后的新DSTREAM。 |
count() | 对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam。 |
reduce(func) | 使用函数func(有两个参数并返回一个结果)将源DStream 中每个RDD的元素进行聚 合操作,返回一个内部所包含的RDD只有一个元素的新DStream。 |
countByValue() | 计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。 |
reduceByKey(func, [numTasks]) | 当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新 DStream,其中每个键的值V都是使用聚合函数func汇总。注意:默认情况下,使用 Spark的默认并行度提交任务(本地模式下并行度为2,集群模式下位8),可以通过配置numTasks设置不同的并行任务数。 |
join(otherStream, [numTasks]) | 当被调用类型分别为(K,V)和(K,W)键值对的2个DStream时,返回类型为(K,(V,W))键值对的一个新 DSTREAM。 |
cogroup(otherStream, [numTasks]) | 当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。 |
transform(func) | 通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。 |
updateStateByKey(func) | 返回一个新状态的DStream,其中每个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法可以被用来维持每个键的任何状态数据。 |
在上面列出的这些操作中,transform()方法和updateStateByKey()方法值得我们深入的探讨一下:
l transform(func)操作
该transform操作(转换操作)连同其其类似的 transformWith操作允许DStream 上应用任意RDD-to-RDD函数。它可以被应用于未在DStream API 中暴露任何的RDD操作。例如,在每批次的数据流与另一数据集的连接功能不直接暴露在DStream API 中,但可以轻松地使用transform操作来做到这一点,这使得DStream的功能非常强大。
l updateStateByKey操作
该 updateStateByKey 操作可以让你保持任意状态,同时不断有新的信息进行更新。要使用此功能,必须进行两个步骤 :
(1) 定义状态 - 状态可以是任意的数据类型。
(2) 定义状态更新函数 - 用一个函数指定如何使用先前的状态和从输入流中获取的新值 更新状态。
让我们用一个例子来说明,假设你要进行文本数据流中单词计数。在这里,正在运行的计数是状态而且它是一个整数。我们定义了更新功能如下:
详细案例参考:
http://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#transformations-on-dstreams
此函数应用于含有键值对的DStream中(如前面的示例中,在DStream中含有(word,1)键值对)。它会针对里面的每个元素(如wordCount中的word)调用一下更新函数,newValues是最新的值,runningCount是之前的值。
2.2.3.2 窗口转换操作
Spark Streaming 还提供了窗口的计算,它允许你通过滑动窗口对数据进行转换,窗口转换操作如下:
转换 | 描述 |
window(windowLength, slideInterval) | 返回一个基于源DStream的窗口批次计算后得到新的DStream。 |
countByWindow(windowLength,slideInterval) | 返回基于滑动窗口的DStream中的元素的数量。 |
reduceByWindow(func, windowLength,slideInterval) | 基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。 |
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) | 基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。 |
reduceByKeyAndWindow(func, invFunc,windowLength,slideInterval, [numTasks]) | 一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最早的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么我们可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率。 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) | 基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置。 |
2.2.3.3 输出操作
Spark Streaming允许DStream的数据被输出到外部系统,如数据库或文件系统。由于输出操作实际上使transformation操作后的数据可以通过外部系统被使用,同时输出操作触发所有DStream的transformation操作的实际执行(类似于RDD操作)。以下表列出了目前主要的输出操作:
转换 | 描述 |
print() | 在Driver中打印出DStream中数据的前10个元素。 |
saveAsTextFiles(prefix, [suffix]) | 将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsObjectFiles(prefix, [suffix]) | 将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsHadoopFiles(prefix, [suffix]) | 将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
foreachRDD(func) | 最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming应用的Driver进程里执行的。 |
dstream.foreachRDD是一个非常强大的输出操作,它允将许数据输出到外部系统。详细案例请参考:
http://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#output-operations-on-dstreams
3、spark整合kafka
用spark streaming流式处理kafka中的数据,第一步当然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。
基于Receiver的方式
这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于所有的接收器,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据。如下图:
还有几个需要注意的点:
- 在Receiver的方式中,ssc中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度。
- 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
- 如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是
KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )
对于所有的receivers接收到的数据将会保存在spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上
直接读取方式
在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition偏移量范围来处理每个batch。其形式如下图:
这种方法相较于Receiver方式的优势在于:
- 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,会创建和kafka分区一样的rdd个数。
- 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,第一次是被kafka复制,另一次是写到wal中,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
- 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具。
以上主要是对官方文档[1]的一个简单翻译,详细内容大家可以直接看下官方文档这里不再赘述。
http://spark.apache.org/docs/1.6.3/streaming-kafka-integration.html
不同于Receiver的方式,是从Zookeeper中读取offset值,那么自然zookeeper就保存了当前消费的offset值,那么如果重新启动开始消费就会接着上一次offset值继续消费。
而在Direct的方式中,我们是直接从kafka来读数据,那么offset需要自己记录,可以利用checkpoint、数据库或文件记录或者回写到zookeeper中进行记录。这里我们给出利用Kafka底层API接口,将offset及时同步到zookeeper中的通用类,我将其放在了github上:
Spark streaming+Kafka demo
示例中KafkaManager是一个通用类,而KafkaCluster是kafka源码中的一个类,由于包名权限的原因我把它单独提出来,ComsumerMain简单展示了通用类的使用方法,在每次创建KafkaStream时,都会先从zooker中查看上次的消费记录offsets,而每个batch处理完成后,会同步offsets到zookeeper中。
refer:http://blog.csdn.net/zhong_han_jun/article/details/50814038
参考:
Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍
Spark Streaming 与 Kafka 集成分析
http://blog.selfup.cn/1665.html#comments