我有一个火花工作,我正在两个数据帧之间进行外部联接。
第一个数据帧的大小为260 GB,文件格式为文本文件,分为2200个文件,第二个数据帧的大小为2GB。
然后,将大约260 GB的数据帧输出写入S3需要很长时间,因为我在EMR上进行了很大的更改,因此我取消了2个小时以上。
这是我的集群信息。
emr-5.9.0
Master: m3.2xlarge
Core: r4.16xlarge 10 machines (each machine has 64 vCore, 488 GiB memory,EBS Storage:100 GiB)
这是我正在设置的群集配置
capacity-scheduler yarn.scheduler.capacity.resource-calculator :org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
emrfs-site fs.s3.maxConnections: 200
spark maximizeResourceAllocation: true
spark-defaults spark.dynamicAllocation.enabled: true
我也尝试手动设置内存组件,如下所示,虽然性能更好,但同样又花了很长时间
--num-executors 60--conf spark.yarn.executor.memoryOverhead = 9216 --executor-memory 72G --conf spark.yarn.driver.memoryOverhead = 3072 --driver-memory 26G --executor-cores 10-驱动核心3 --conf spark.default.parallelism = 1200
我没有使用默认分区将数据保存到S3中。
添加有关作业和查询计划的所有详细信息,以使其易于理解。
真正的原因是分区。这占用了大多数时间。
因为我有2K文件,所以如果我使用像200这样的重新分区,则输出
文件以十万个进来,然后再次在spark中加载不是一个好习惯
故事。
在下图中,我不知道为什么在项目后再次调用sort
在下面的图片中,GC对我而言太高了。.oi是否必须处理此问题,请提出如何建议?
下面是节点的健康状态。这一点数据已保存到S3中,这也怪不得为什么我只能看到两个节点处于活动状态而所有节点都处于空闲状态。
这是正在加载时的群集详细信息。在这一点上,我可以看到群集已得到充分利用,但是在将数据保存到S3时,许多节点是免费的。
最后,这是我执行加入然后保存到S3中的代码。
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").desc)
val latestForEachKey = df2resultTimestamp.withColumn("rank", row_number.over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val columnMap = latestForEachKey.columns.filter(c => c.endsWith("_1") & c != "FFAction|!|_1").map(c => c -> c.dropRight(2)) :+ ("FFAction|!|_1", "FFAction|!|")
val exprs = columnMap.map(t => coalesce(col(s"${t._1}"), col(s"${t._2}")).as(s"${t._2}"))
val exprsExtended = Array(col("uniqueFundamentalSet"), col("PeriodId"), col("SourceId"), col("StatementTypeCode"), col("StatementCurrencyId"), col("FinancialStatementLineItem_lineItemId")) ++ exprs
//Joining both dara frame here
val dfMainOutput = (dataMain.join(latestForEachKey, Seq("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId"), "outer") select (exprsExtended: _*)).filter(!$"FFAction|!|".contains("D|!|"))
//Joing ends here
val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"PartitionYear", $"PartitionStatement", concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").filter(_ != "PartitionStatement").map(c => col(c)): _*).as("concatenated"))
val headerColumn = dataHeader.columns.toSeq
val headerFinal = headerColumn.mkString("", "|^|", "|!|").dropRight(3)
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", headerFinal)
// dfMainOutputFinalWithoutNull.repartition($"DataPartition", $"PartitionYear", $"PartitionStatement")
.write
.partitionBy("DataPartition", "PartitionYear", "PartitionStatement")
.format("csv")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.option("nullValue", "")
.option("delimiter", "\t")
.option("quote", "\u0000")
.option("header", "true")
.option("codec", "bzip2")
.save(outputFileURL)
最佳答案
您正在运行五个c3.4large EC2实例,每个实例具有30gb的RAM。因此,总共只有150GB,比要加入的大于200GB的数据帧小得多。因此大量磁盘溢出。也许您可以启动r型EC2实例(优化内存,而不是优化计算的c型),然后查看性能是否有所提高。