问题描述
我在spark中运行以下代码,以比较存储在csv文件和配置单元表中的数据.我的数据文件约为1.5GB,约2亿行.当我运行下面的代码时,我得到了GC overhead limit exceeded error
.我不确定为什么会收到此错误.我搜索了各种文章.
I am running the below code in spark to compare the data stored in a csv file and a hive table. My data file is about 1.5GB and about 0.2 billion rows. When I run the code below, I am getting GC overhead limit exceeded error
. I am not sure why I am getting this error. I have search various articles.
错误来自测试3"步骤sourceDataFrame.except(targetRawData).count > 0
我不确定是否有内存泄漏.我该如何调试和解决相同的问题?
The error comes at Test 3 step sourceDataFrame.except(targetRawData).count > 0
I am not sure if there is any memory leak or not. How can I debug and resolve the same?
import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext
//val conf = new SparkConf().setAppName("Simple Application")
//val sc = new SparkContext(conf)
val hc = new HiveContext(sc)
val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()
// set source and target location
//val sourceDataLocation = "hdfs://localhost:9000/sourcec.txt"
val sourceDataLocation = "s3a://rbspoc-sas/sas_valid_large.txt"
val targetTableName = "temp_TableA"
// Extract source data
println("Extracting SAS source data from csv file location " + sourceDataLocation);
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val sourceRawCsvData = sc.textFile(sourceDataLocation)
println("Extracting target data from hive table " + targetTableName)
val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)
// Add the test cases here
// Test 1 - Validate the Structure
println("Validating the table structure...")
var startTime = getTimestamp()
val headerColumns = sourceRawCsvData.first().split(",").to[List]
val schema = TableASchema(headerColumns)
val sourceData = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
.map(_.split(",").toList)
.map(row)
val sourceDataFrame = spark.createDataFrame(sourceData,schema)
//val sourceDataFrame = sourceDataFrame.toDF(sourceDataFrame.columns map(_.toLowerCase): _*)
val sourceSchemaList = flatten(sourceDataFrame.schema).map(r => r.dataType.toString).toList
val targetSchemaList = flatten(targetRawData.schema).map(r => r.dataType.toString).toList
var endTime = getTimestamp()
if (sourceSchemaList.diff(targetSchemaList).length > 0) {
println("Updating StructureValidation result in table...")
UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed StructureValidation. ")
// Force exit here if needed
// sys.exit(1)
} else {
println("Updating StructureValidation result in table...")
UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed StructureValidation. ")
}
// Test 2 - Validate the Row count
println("Validating the Row count...")
startTime = getTimestamp()
// check the row count.
val sourceCount = sourceData.count()
val targetCount = targetRawData.count()
endTime = getTimestamp()
if (sourceCount != targetCount){
println("Updating RowCountValidation result in table...")
// Update the result in the table
UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed RowCountValidation. Source count:$sourceCount and Target count:$targetCount")
// Force exit here if needed
//sys.exit(1)
}
else{
println("Updating RowCountValidation result in table...")
// Update the result in the table
UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed RowCountValidation. Source count:$sourceCount and Target count:$targetCount")
}
// Test 3 - Validate the data
println("Comparing source and target data...")
startTime = getTimestamp()
if (sourceDataFrame.except(targetRawData).count > 0 ){
endTime = getTimestamp()
// Update the result in the table
println("Updating DataValidation result in table...")
UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed DataMatch validation")
// Force exit here if needed
// sys.exit(1)
}
else{
endTime = getTimestamp()
println("Updating DataValidation result in table...")
// Update the result in the table
UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed DataMatch validation")
}
// Test 4 - Calculate the average and variance of Int or Dec columns
// Test 5 - String length validation
def UpdateResult(tableName: String, startTime: String, endTime: String, returnCode: Int, description: String){
val insertString = s"INSERT INTO TABLE TestResult VALUES( FROM_UNIXTIME(UNIX_TIMESTAMP()),'$startTime','$endTime','$tableName',$returnCode,'$description')"
val a = hc.sql(insertString)
}
def TableASchema(columnName: List[String]): StructType = {
StructType(
Seq(
StructField(name = "datetime", dataType = TimestampType, nullable = true),
StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
StructField(name = "source_bank", dataType = StringType, nullable = true),
StructField(name = "emp_name", dataType = StringType, nullable = true),
StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
)
)
}
def row(line: List[String]): Row = {
Row(convertToTimestamp(line(0).trim), convertToDate(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
}
def convertToTimestamp(s: String) : Timestamp = s match {
case "" => null
case _ => {
val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
Try(new Timestamp(format.parse(s).getTime)) match {
case Success(t) => t
case Failure(_) => null
}
}
}
def convertToDate(s: String) : Timestamp = s match {
case "" => null
case _ => {
val format = new SimpleDateFormat("ddMMMyyyy")
Try(new Timestamp(format.parse(s).getTime)) match {
case Success(t) => t
case Failure(_) => null
}
}
}
def flatten(scheme: StructType): Array[StructField] = scheme.fields.flatMap { f =>
f.dataType match {
case struct:StructType => flatten(struct)
case _ => Array(f)
}
}
def getTimestamp(): String = {
val now = java.util.Calendar.getInstance()
val timestampFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
timestampFormat.format(now.getTime())
}
异常在下面:
17/12/21 05:18:40 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(8,0,ShuffleMapTask,TaskKilled(stage cancelled),org.apache.spark.scheduler.TaskInfo@78db3052,null)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 8.0 failed 1 times, most recent failure: Lost task 17.0 in stage 8.0 (TID 323, localhost, executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2430)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2429)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2429)
... 53 elided
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
scala> 17/12/21 05:18:40 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: /tmp/spark-6f345216-41df-4fd6-8e3d-e34d49e28f0c
java.io.IOException: Failed to delete: /tmp/spark-6f345216-41df-4fd6-8e3d-e34d49e28f0c
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1031)
at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:65)
at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:62)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(ShutdownHookManager.scala:62)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
推荐答案
您的Spark进程在垃圾回收中浪费了太多时间.大多数cpu核心被消耗掉了,处理没有完成.执行器内存不足.您可以尝试以下选项
Your spark process is wasting too much time in Garbage collection.Most of the cpu core is getting consumed and processing doesnt completes.You are running out of executor memory.You can try below options
- 调整属性
spark.storage.memoryFraction
并spark.memory.storageFraction
.您还可以发出命令来对此进行调整-spark-submit ... --executor-memory 4096m --num-executors 20..
- 或者通过更改GC策略.检查当前的GC值.将该值设置为-
XX:G1GC
- Tune the property
spark.storage.memoryFraction
andspark.memory.storageFraction
.You can also issue the command to tune this-spark-submit ... --executor-memory 4096m --num-executors 20..
- Or by changing the GC policy.Check the current GC value.Set the value to -
XX:G1GC
这篇关于Spark GC超出开销限制错误消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!