进入主页,点击右上角“设为星标”
比别人更快接收好文章
Spark Core
一、RDD详解
1. 为什么要有RDD?
在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘中,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。但是,之前的MapReduce框架采用非循环式的数据流模型,把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销。且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象。
AMP实验室发表的一篇关于RDD的论文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》就是为了解决这些问题的。
RDD提供了一个抽象的数据模型,让我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换操作(函数),不同RDD之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘IO和序列化开销,并且还提供了更多的API(map/reduec/filter/groupBy...)。
2. RDD是什么?
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。单词拆解:
3. RDD主要属性
进入RDD的源码中看下:
在源码中可以看到有对RDD介绍的注释,我们来翻译下:
总结
RDD 是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来,如何计算,主要属性包括:
分区列表、分区函数、最佳位置,这三个属性其实说的就是数据集在哪,在哪计算更合适,如何分区;
计算函数、依赖关系,这两个属性其实说的是数据集怎么来的。
二、RDD-API
1. RDD的创建方式
makeRDD方法底层调用了parallelize方法:
2. RDD的算子分类
RDD的算子分为两类:
3. Transformation转换算子
4. Action动作算子
统计操作:
三、RDD的持久化/缓存
在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。
val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了
持久化/缓存API详解
RDD通过persist或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
通过查看RDD的源码发现cache最终也是调用了persist无参方法(默认存储只存在内存中):
默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
总结:
四、RDD容错机制Checkpoint
持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。
Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。
用法:
SparkContext.setCheckpointDir("目录") //HDFS的目录
RDD.checkpoint
五、RDD依赖关系
1. 宽窄依赖
窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖;
宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)。
2. 为什么要设计宽窄依赖
窄依赖的多个分区可以并行计算;
窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。
划分Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。
六、DAG的生成和划分Stage
1. DAG介绍
DAG(Directed Acyclic Graph有向无环图)指的是数据转换执行的过程,有方向,无闭环(其实就是RDD执行的流程);
原始的RDD通过一系列的转换操作就形成了DAG有向无环图,任务执行时,可以按照DAG的描述,执行真正的计算(数据被操作的一个过程)。
开始:通过SparkContext创建的RDD;
结束:触发Action,一旦触发Action就形成了一个完整的DAG。
2.DAG划分Stage
一个Spark程序可以有多个DAG(有几个Action,就有几个DAG,上图最后只有一个Action(图中未表现),那么就是一个DAG)。
一个DAG可以有多个Stage(根据宽依赖/shuffle进行划分)。
同一个Stage可以有多个Task并行执行(task数=分区数,如上图,Stage1 中有三个分区P1、P2、P3,对应的也有三个 Task)。
可以看到这个DAG中只reduceByKey操作是一个宽依赖,Spark内核会以此为边界将其前后划分成不同的Stage。
同时我们可以注意到,在图中Stage1中,从textFile到flatMap到map都是窄依赖,这几步操作可以形成一个流水线操作,通过flatMap操作生成的partition可以不用等待整个RDD计算结束,而是继续进行map操作,这样大大提高了计算的效率。
一个复杂的业务逻辑如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照shuffle进行划分(也就是按照宽依赖就行划分),就可以将一个DAG划分成多个Stage/阶段,在同一个Stage中,会有多个算子操作,可以形成一个pipeline流水线,流水线内的多个平行的分区可以并行执行。
对于窄依赖,partition的转换处理在stage中完成计算,不划分(将窄依赖尽量放在在同一个stage中,可以实现流水线计算)。
对于宽依赖,由于有shuffle的存在,只能在父RDD处理完成后,才能开始接下来的计算,也就是说需要要划分stage。
总结:
Spark会根据shuffle/宽依赖使用回溯算法来对DAG进行Stage划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的stage/阶段中
具体的划分算法请参见AMP实验室发表的论文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se
文章推荐:
Spark底层执行原理详细解析
本文分享自微信公众号 - 五分钟学大数据(gh_d4a7af3ecd50)。
如有侵权,请联系 [email protected] 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。