1.尽可能复用同一个RDD

2.对重复使用的RDD进行持久化

3.对DAG过长的计算增加chekpoint(检查点)机制,将文件最好保存在HDFS中(多副本)

4.选择一种合适的持久化策略

  • 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

  • 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

  • 如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

  • 通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

5.更改程序shufflle的序列化方式为Kryo

方法一:修改spark-defaults.conf配置文件
设置:
spark.serializer  org.apache.spark.serializer.KryoSerializer
注意:用空格隔开
方法二:启动spark-shell或者spark-submit时配置
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
方法三:在代码中
val conf = new SparkConf()
conf.set(“spark.serializer”,“org.apache.spark.serializer.KryoSerializer”)

6.测试的时候可以使用collect,但是生产环境上不可以使用。Driver端会爆炸

7.开启推迟执行机制

可以设置spark.speculation  true

开启后,spark会检测执行较慢的Task,并复制这个Task在其他节点运行,最后哪个节点先运行完,就用其结果,然后将慢Task 杀死

8.配置多临时文件目录

spark.local.dir参数。当shuffle、归并排序(sort、merge)时都会产生临时文件。这些临时文件都在这个指定的目录下。那这个文件夹有很多临时文件,如果都发生读写操作,有的线程在读这个文件,有的线程在往这个文件里写,磁盘I/O性能就非常低。

可以创建多个文件夹每个文件夹都对应一个真实的硬盘。假如原来是3个程序同时读写一个硬盘,效率肯定低,现在让三个程序分别读取3个磁盘,这样冲突减少,效率就提高了。这样就有效提高外部文件读和写的效率。怎么配置呢?只需要在这个配置时配置多个路径就可以。中间用逗号分隔。

spark.local.dir=/home/tmp,/home/tmp2

然后需要把每个目录挂载到不同的磁盘上

9.进行join时,可以将小表广播出去 

10.有些情况下,RDD操作使用MapPartitions替代map

比如JdisPool,mysql连接池的创建和销毁操作

map方法对RDD的每一条记录逐一操作。mapPartitions是对RDD里的每个分区操作

rdd.map{ x=>conn=getDBConn.conn;write(x.toString);conn close;}

这样频繁的链接、断开数据库,效率差。

rdd.mapPartitions{(record:=>conn.getDBConn;for(item<-recorders;write(item.toString);conn close;}

这样就一次链接一次断开,中间批量操作,效率提升。

参数配置调优

  • num-executors:该参数一定会被设置,Yarn 会按照 Driver 的申请去最终为当前的 Application 生产指定个数的 Executors,实际生产环境下应该分配80个左右 Executors 会比较合适呢。
  • executor-memory:这个定义了每个 Executor 的内存,它与 JVM OOM 紧密相关,很多时候甚至决定了 Spark 运行的性能。实际生产环境下建义是 8G 左右,很多时候 Spark 运行在 Yarn 上,内存占用量不要超过 Yarn 的内存资源的 50%。
  • executor-cores:决定了在 Executors 中能够并行执行的 Tasks 的个数。实际生产环境下应该分配4个左右,一般情况下不要超过 Yarn 队列中 Cores 总数量的 50%。
  • driver-memory:默应是 1G
  • spark.default.parallelizm:并行度问题,如果不设置这个参数,Spark 会跟据 HDFS 中 Block 的个数去设置这一个数量,原理是默应每个 Block 会对应一个 Task,默应情况下,如果数据量不是太多就不可以充份利用 executor 设置的资源,就会浪费了资源。建义设置为 100个,最好 700个左右。Spark官方的建义是每一个 Core 负责 2-3 个 Task。 
  • spark.storage.memoryFraction:默应占用 60%,如果计算比较依赖于历史数据则可以调高该参数,当如果计算比较依赖 Shuffle 的话则需要降低该比例。
  • spark.shuffle.memoryFraction:默应占用 20%,如果计算比较依赖 Shuffle 的话则需要调高该比例。
  • spark.shuffle.file.buffer:默认值:32k  参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
  • spark.reducer.maxSizeInFlight:默认值:48m 该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

  • spark.shuffle.io.maxRetries:默认值:3  shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
    调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

  • spark.shuffle.io.retryWait :默认值:5s参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

  • spark.shuffle.manager:默认值:sort  该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
    调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug

  • spark.shuffle.sort.bypassMergeThreshold

    默认值:200
    参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

  • spark.shuffle.consolidateFiles:默认值:false 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

03-02 17:24