1.搭建集群

在Spark安装路径下 spark/conf/spark-env.sh配置:

SPARK_WORKER_CORES=XXX

SPARK_WORKER_MEMORY=XXX

2.提交任务

提交任务命令,最好使用脚本化提交,可以在提交任务时给当前spark应用程序足够的资源,提交命令:

./spark-submit --master spark://node1:7077 -class ... jar ...

可以在-master 之后-class之前添加以下参数

--dirver-cores:指定Driver端使用的core数。

--driver-memory:指定Driver端的使用内存数,如果Driver端有回收数据广播的时候,可以多申请内存。

--executor-cores:指定executor端使用的core数量,一个core在某个时刻并行执行1个task任务,executor中的core越多并行的task任务越多。

--executor-memory:指定Executor端使用的内存,如果shuffle量多、对RDD持久化多、task创建任务多,可以多申请点资源。

--total-executor-cores:Standalone集群中,指定当前应用程序最多使用多少core.在Standalone集群中,默认提交一个Spark应用程序,这个Spark应用程序会使用集群中所有的资源。

--num-executor:在yarn集群中,默认为一个提交的spark应用程序启动2个Executor。

-处理以上在提交任务中可以设置参数,还可以在代码中设置(不建议硬编码)

也可以在spark提交任务的客户端中的配置文件中配置:spark/conf/spark-defaults.xml(不建议):

spark.driver.cores

spark.driver.memory

spark.executor.cores

spark.executor.memory

spark.cores.max

3.提高并行度

-SparkContext.textFile(xx,minum)

-SparkContext.parallelize(xx,num)

-SparkContext.makeRDD(xx,num)

-SparkContext.parallelizePairs(List<Tuple2<xx,xx>>,num)

-RDD的算子repartition(num)/coalesce(num)、reduceByKey(num)、join(num)、groupByKey(num),distinct(num)

-"spark.default.parallelism" Spark中默认的并行度

-自定义分区器也可以增加并行度

-"Spark.sql.shuffle.partitions"设置SparkSQL中解析成SparkJob的并行度。

-SparkStreaming中Driect模式可以增大读取的topic的partition个数。

4.代码优化

-避免创建重复的RDD,这种情况是避免某些代码逻辑不执行,逻辑混乱。

-对多次使用的RDD进行持久化,持久化算子:cache()=persist()=persist(StroageLevel.MEMORY_ONLY)

如果内存够用就使用MEMORY_ONLYJ级别,如果不是内存太充足,可以采用MEMOEY_ONLY_SER级别,相当于牺牲性能换取空间。如果使用MEMORY_ONLY_SER级别还是不能完全储存,可以使用MEMORY_AND_DISK或者MEMORY_AND_DISK_SER.

注意:对RDD进行持久化时,一定要使用Action算子触发,才能使RDD被持久化。

-尽量避免使用shuffle类的算子。利用广播变量+filter类的算子代替join

-尽量使用map端有预聚合的算子(map-combine)

5.map端预聚合的好处:

(1)减少map端shuffle的落地数据量

(2)减少reduce端拉取的数据量

(3)极少节点之间传输的数据量

map端预聚合的算子:

(1)reduceByKey

(2)aggregateByKey

(3)combineByKey

6.使用高性能算子:

(1)reduceByKey代替groupByKey

(2)使用mappartition代替map

(3)使用foreachPartitions代替foreach

(4)对大量分区过滤后使用repartition/coalesce减少分区

7.使用广播变量

如果Executor端使用到了Driver端的变量副本,如果不使用广播变量,在每个Executor端有多少个task任务就有多少个变量副本。

如果使用广播变量,Executor端的变量副本只有一份,所有的Executor端使用一份共享的变量副本。可以节省大量的内存。

8.优化数据结构

在编写Spark代码时

(1)尽量使用原生的数据类型代替字符串

(2)尽量使用字符串代替对象

(3)尽量使用数组代替map集合

(4)尽量使用tinyint代表int

9.使用Kryo序列化方式

在Spark中的序列化:

(1)在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。

(2)将自定义的类型作为RDD的

 

09-03 17:17