1.搭建集群
在Spark安装路径下 spark/conf/spark-env.sh配置:
SPARK_WORKER_CORES=XXX
SPARK_WORKER_MEMORY=XXX
2.提交任务
提交任务命令,最好使用脚本化提交,可以在提交任务时给当前spark应用程序足够的资源,提交命令:
./spark-submit --master spark://node1:7077 -class ... jar ...
可以在-master 之后-class之前添加以下参数
--dirver-cores:指定Driver端使用的core数。
--driver-memory:指定Driver端的使用内存数,如果Driver端有回收数据广播的时候,可以多申请内存。
--executor-cores:指定executor端使用的core数量,一个core在某个时刻并行执行1个task任务,executor中的core越多并行的task任务越多。
--executor-memory:指定Executor端使用的内存,如果shuffle量多、对RDD持久化多、task创建任务多,可以多申请点资源。
--total-executor-cores:Standalone集群中,指定当前应用程序最多使用多少core.在Standalone集群中,默认提交一个Spark应用程序,这个Spark应用程序会使用集群中所有的资源。
--num-executor:在yarn集群中,默认为一个提交的spark应用程序启动2个Executor。
-处理以上在提交任务中可以设置参数,还可以在代码中设置(不建议硬编码)
也可以在spark提交任务的客户端中的配置文件中配置:spark/conf/spark-defaults.xml(不建议):
spark.driver.cores
spark.driver.memory
spark.executor.cores
spark.executor.memory
spark.cores.max
3.提高并行度
-SparkContext.textFile(xx,minum)
-SparkContext.parallelize(xx,num)
-SparkContext.makeRDD(xx,num)
-SparkContext.parallelizePairs(List<Tuple2<xx,xx>>,num)
-RDD的算子repartition(num)/coalesce(num)、reduceByKey(num)、join(num)、groupByKey(num),distinct(num)
-"spark.default.parallelism" Spark中默认的并行度
-自定义分区器也可以增加并行度
-"Spark.sql.shuffle.partitions"设置SparkSQL中解析成SparkJob的并行度。
-SparkStreaming中Driect模式可以增大读取的topic的partition个数。
4.代码优化
-避免创建重复的RDD,这种情况是避免某些代码逻辑不执行,逻辑混乱。
-对多次使用的RDD进行持久化,持久化算子:cache()=persist()=persist(StroageLevel.MEMORY_ONLY)
如果内存够用就使用MEMORY_ONLYJ级别,如果不是内存太充足,可以采用MEMOEY_ONLY_SER级别,相当于牺牲性能换取空间。如果使用MEMORY_ONLY_SER级别还是不能完全储存,可以使用MEMORY_AND_DISK或者MEMORY_AND_DISK_SER.
注意:对RDD进行持久化时,一定要使用Action算子触发,才能使RDD被持久化。
-尽量避免使用shuffle类的算子。利用广播变量+filter类的算子代替join
-尽量使用map端有预聚合的算子(map-combine)
5.map端预聚合的好处:
(1)减少map端shuffle的落地数据量
(2)减少reduce端拉取的数据量
(3)极少节点之间传输的数据量
map端预聚合的算子:
(1)reduceByKey
(2)aggregateByKey
(3)combineByKey
6.使用高性能算子:
(1)reduceByKey代替groupByKey
(2)使用mappartition代替map
(3)使用foreachPartitions代替foreach
(4)对大量分区过滤后使用repartition/coalesce减少分区
7.使用广播变量
如果Executor端使用到了Driver端的变量副本,如果不使用广播变量,在每个Executor端有多少个task任务就有多少个变量副本。
如果使用广播变量,Executor端的变量副本只有一份,所有的Executor端使用一份共享的变量副本。可以节省大量的内存。
8.优化数据结构
在编写Spark代码时
(1)尽量使用原生的数据类型代替字符串
(2)尽量使用字符串代替对象
(3)尽量使用数组代替map集合
(4)尽量使用tinyint代表int
9.使用Kryo序列化方式
在Spark中的序列化:
(1)在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。
(2)将自定义的类型作为RDD的