1.概念:

RDD是spark整个体系中最基础核心的概念,RDD(Resilient Distributed DataSet)即弹性分布式数据集

弹性:

RDD支持横向多分区,纵向操作内存不足写入磁盘,hdfs等,实现数据在内存和外存的灵活切换。

RDD可以在存储在内存和磁盘之间,并且自动或者手动切换

RDD具有良好的容错性(即RDD可以通过血统转化为其他RDD)

Task如果失败,会进行特定次数的重试(default 4)

Stage如果失败会就行特定次数的重试

RDD可以存储任意类型的数据

RDD的分区数目可以自行设定

分布式:

RDD可以存储在多台主机的内存或者磁盘之上。每个RDD可以分为多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而在集群中进行分布式并行计算。

数据集:

RDD是数据集合的抽象,从外部看RDD就是封装之后的可容错的数据集

RDD相当于是一个代理,对RDD进行操作其实就是对分区进行操作,就是对每一台机器上的迭代器进行操作,因为迭代器引用着我们要操作的数据。
RDD存储的是逻辑数据结构,不存储真实数据,像关系数据库中的view 视图,只是表结构。

2.RDD的五个特征:

A list of partitioner     一系列分区
A function for computing each split     会有一个函数作用在每个切片上
A list of depedencies on other RDDs    即RDD具有血统,RDD和RDD之间存在依赖关系
Optionally, a Partitioner for key-value RDDs (可选)如果是RDD中装的是KV类型的,那么Shuffle时会有一个分区器。默认是HashPartitioner。目前只有HashPartitioner 和RangeRartitioner
Optionally, a list of preferred locations to compute each split on (可选)如果只从HDFS中读取数据,会感知数据则位置,将Executor启动在数据所在的机器上

3.生成RDD的方式:

执行Transform操作(变换操作),根据已有的RDD计算得到

读取外部存储系统的数据集,如HDFS,HBase,或任何与Hadoop有关的数据源。

将Driver的Scala集合通过并行化的方式变成RDD(试验、测验)
 

4.RDD的两种操作:

针对RDD的操作,分两种,一种是Transformation(变换),一种是Actions(执行)。

Transformation(变换)操作属于懒操作(算子),不会真正触发RDD的处理计算。

Actions(执行)操作才会真正触发。前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系。两类操作的主要区别是,Transformation转换操作(比如map、filter、join等)接受RDD并返回RDD,而Actions行动操作(比如count、collect等)接受RDD但是返回非RDD(即输出一个值或结果)。

RDD采用了惰性调用,即在RDD的执行过程中,真正的计算发生在RDD的“行动”操作,对于“行动”之前的所有“转换”操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算。

" src="https://oscimg.oschina.net/oscnet/20180728160456862.">


转换操作:对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
下面列出一些常见的转换操作(Transformation API):

filter(func):筛选出满足函数func的元素,并返回一个新的数据集
map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合

行动操作:行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。 
下面列出一些常见的行动操作(Action API):

count() 返回数据集中的元素个数
collect() 以数组的形式返回数据集中的所有元素
first() 返回数据集中的第一个元素
take(n) 以数组的形式返回数据集中的前n个元素
reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
foreach(func) 将数据集中的每个元素传递到函数func中运行
 

5.RDD的依赖关系:

RDD的依赖关系是spark计算优于hadoop的重要原因之一。

RDD中不同的操作会使得不同RDD中的分区会产生不同的依赖。

RDD中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency)

窄依赖:对于窄依赖操作,它们只是将Partition的数据根据转换的规则进行转化,并不涉及其他的处理,可以简单地认为只是将数据从一个形式转换到另一个形式。  如 map  filter  union等 

窄依赖表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区

宽依赖:表现为存在一个父RDD的一个分区对应一个子RDD的多个分区。宽依赖典型的操作包括groupByKey、sortByKey等

RDD-LMLPHP

窄依赖源码:

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

    //返回子RDD的partitionId依赖的所有的parent RDD的Partition(s)

    def getParents(partitionId: Int): Seq[Int]

    override def rdd: RDD[T] = _rdd

}

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {

    override def getParents(partitionId: Int) = List(partitionId)

}

宽依赖源码:

class ShuffleDependency[K, V, C](

    @transient _rdd: RDD[_ <: Product2[K, V]],

    val partitioner: Partitioner,

    val serializer: Option[Serializer] = None,

    val keyOrdering: Option[Ordering[K]] = None,

    val aggregator: Option[Aggregator[K, V, C]] = None,

    val mapSideCombine: Boolean = false)

extends Dependency[Product2[K, V]] {

override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

//获取新的shuffleId

val shuffleId: Int = _rdd.context.newShuffleId()

//向ShuffleManager注册Shuffle的信息

val shuffleHandle: ShuffleHandle =

_rdd.context.env.shuffleManager.registerShuffle(

    shuffleId, _rdd.partitions.size, this)

    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))

}

spark中一旦遇到宽依赖就需要进行shuffle的操作,所谓的shuffle的操作的本质就是将数据汇总后重新分发的过程。

这个过程数据要汇总到一起,数据量可能很大所以不可避免的需要进行数据落磁盘的操作,会降低程序的性能,所以spark并不是完全内存不读写磁盘,只能说它尽力避免这样的过程来提高效率 。

spark中的shuffle,在早期的版本中,会产生多个临时文件,但是这种多临时文件的策略造成大量文件的同时的读写,磁盘的性能被分摊给多个文件,每个文件读写效率都不高,影响spark的执行效率。所以在后续的spark中(1.2.0之后的版本)的shuffle中,只会产生一个文件,并且数据会经过排序再附加索引信息,减少了文件的数量并通过排序索引的方式提升了性能。

6.RDD的运行流程

RDD-LMLPHP

" src="https://oscimg.oschina.net/oscnet/20180728160535549.">

1)Driver端 创建RDD对象 SparkContext根据用户提交的程序计算RDD之间的依赖关系,构建DAG

2)Driver端 DAGScheduler将DAG 切分Stage(切分的依据是遇到宽依赖shuffle),将stage中生成的Task以TaskSet的形式给TaskScheduler

3)Driver端 TaskScheduler调度Task(根据资源情况将Task调度到对应的Executor中)

 4)Executor接收Task,然后用实现了Runnable接口的包装类将Task包装起来丢入到线程池中执行。

7、RDD底层实现原理
RDD是一个分布式数据集,顾名思义,其数据应该分部存储于多台机器上。事实上,每个RDD的数据都以Block的形式存储于多台机器上,下图是Spark的RDD存储架构图,其中每个Executor会启动一个BlockManagerSlave,并管理一部分Block;而Block的元数据由Driver节点的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系,当RDD不再需要存储的时候,将向BlockManagerSlave发送指令删除相应的Block。

8、RDD cache的原理
RDD的转换过程中,并不是每个RDD都会存储,如果某个RDD会被重复使用,或者计算其代价很高,那么可以通过显示调用RDD提供的cache()方法,把该RDD存储下来。那RDD的cache是如何实现的呢?

RDD中提供的cache()方法只是简单的把该RDD放到cache列表中。当RDD的iterator被调用时,通过CacheManager把RDD计算出来,并存储到BlockManager中,下次获取该RDD的数据时便可直接通过CacheManager从BlockManager读出。
 

02-17 20:05