RDD的转换

  Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。接下来以“Word Count”为例,详细描述这个DAG生成的实现过程。

Spark Scala版本的Word Count程序如下:

1: val file = spark.textFile("hdfs://...")
2: val counts = file.flatMap(line => line.split(" "))
3:  .map(word => (word, 1))
4:  .reduceByKey(_ + _)
5: counts.saveAsTextFile("hdfs://...")

file和counts都是RDD,其中file是从HDFS上读取文件并创建了RDD,而counts是在file的基础上通过flatMap、map和reduceByKey这三个RDD转换生成的。最后,counts调用了动作saveAsTextFile,用户的计算逻辑就从这里开始提交的集群进行计算。那么上面这5行代码的具体实现是什么呢?

1)行1:spark是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口。spark会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。

  spark.textFile("hdfs://...")就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。

  也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。

2)行2:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到MapPartitionsRDD

3)行3:将第2步生成的MapPartitionsRDD再次经过map将每个单词word转为(word, 1)的元组。这些元组最终被放到一个MapPartitionsRDD中。

4)行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。

5)行5:首先会生成一个MapPartitionsRDD,这个RDD会通过调用org.apache.spark.rdd.PairRDDFunctions#saveAsHadoopDataset向HDFS输出RDD的数据内容。最后,调用org.apache.spark.SparkContext#runJob向集群提交这个计算任务。

RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是RDD的parent RDD(s)是什么;还有就是依赖于parent RDD(s)的哪些Partition(s)。这个关系,就是RDD之间的依赖,org.apache.spark.Dependency。根据依赖于parent RDD(s)的Partitions的不同情况,Spark将这种依赖分为两种,一种是宽依赖,一种是窄依赖。

RDD的依赖关系(宽依赖和窄依赖)

如,假设,现在如下

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

所以,

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

比如,我这里是刚好是4台worker1、worker2、worker3、worker4。还有1台Master。

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

soga,

val file = spark.textFile("hdfs://...")

1)行1:spark是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口。spark会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。

  spark.textFile("hdfs://...")就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。

  也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

想要成为高手,一定要多看源码,看上几十遍都太少了,包括看上10个版本的源码。无论是hadoop、还是spark

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

val counts = file.flatMap(line => line.split(" ")) 2)行2:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到MapPartitionsRDD

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

.map(word => (word, 1))    3)行3:将第2步生成的MapPartitionsRDD再次经过map将每个单词word转为(word, 1)的元组。这些元组最终被放到一个MapPartitionsRDD中。

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

至此,windows本地,已经完成了。

下面是在网络里了。

注意啦! 分区是计算概念,分片是数据概念。

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

有4台worker,每台都在自己内存计算。

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

.reduceByKey(_ + _)

4)行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。

总结:

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

第一个stage :

HadoopRDD  ->   MapPartitionRDD  ->   MapPartitionsRDD  ->  MapPartitionsRDD  ->  MapPartitionsRDD

Spark RDD概念学习系列之RDD的转换(十)-LMLPHP

第二个stage :  

  Stage shuffledRDD   ->  MapPartitionsRDD

04-21 17:49