本文介绍了Spark GC超出开销限制错误消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在spark中运行以下代码,以比较存储在csv文件和配置单元表中的数据.我的数据文件约为1.5GB,约2亿行.当我运行下面的代码时,我得到了GC overhead limit exceeded error.我不确定为什么会收到此错误.我搜索了各种文章.

I am running the below code in spark to compare the data stored in a csv file and a hive table. My data file is about 1.5GB and about 0.2 billion rows. When I run the code below, I am getting GC overhead limit exceeded error. I am not sure why I am getting this error. I have search various articles.

错误来自测试3"步骤sourceDataFrame.except(targetRawData).count > 0我不确定是否有内存泄漏.我该如何调试和解决相同的问题?

The error comes at Test 3 step sourceDataFrame.except(targetRawData).count > 0I am not sure if there is any memory leak or not. How can I debug and resolve the same?

import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext

  //val conf = new SparkConf().setAppName("Simple Application")
  //val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)
  val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()

   // set source and target location
    //val sourceDataLocation = "hdfs://localhost:9000/sourcec.txt"
    val sourceDataLocation = "s3a://rbspoc-sas/sas_valid_large.txt"
    val targetTableName = "temp_TableA"

    // Extract source data
    println("Extracting SAS source data from csv file location " + sourceDataLocation);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      val sourceRawCsvData = sc.textFile(sourceDataLocation)

    println("Extracting target data from hive table " + targetTableName)
    val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)


    // Add the test cases here

    // Test 1 - Validate the Structure
       println("Validating the table structure...")
       var startTime = getTimestamp()
       val headerColumns = sourceRawCsvData.first().split(",").to[List]
       val schema = TableASchema(headerColumns)

       val sourceData = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
       .map(_.split(",").toList)
       .map(row)

       val sourceDataFrame = spark.createDataFrame(sourceData,schema)
       //val sourceDataFrame = sourceDataFrame.toDF(sourceDataFrame.columns map(_.toLowerCase): _*)

       val sourceSchemaList = flatten(sourceDataFrame.schema).map(r => r.dataType.toString).toList
       val targetSchemaList = flatten(targetRawData.schema).map(r => r.dataType.toString).toList
       var endTime = getTimestamp()
       if (sourceSchemaList.diff(targetSchemaList).length > 0) {
           println("Updating StructureValidation result in table...")
           UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed StructureValidation. ")
           // Force exit here if needed
          // sys.exit(1)
       } else {
           println("Updating StructureValidation result in table...")
           UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed StructureValidation. ")
       }

    // Test 2 - Validate the Row count
       println("Validating the Row count...")
       startTime = getTimestamp()
       // check the row count.
       val sourceCount = sourceData.count()
       val targetCount = targetRawData.count()
       endTime = getTimestamp()
       if (sourceCount != targetCount){
           println("Updating RowCountValidation result in table...")
           // Update the result in the table
           UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed RowCountValidation. Source count:$sourceCount and Target count:$targetCount")
           // Force exit here if needed
           //sys.exit(1)
         }
       else{
           println("Updating RowCountValidation result in table...")
           // Update the result in the table
           UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed RowCountValidation. Source count:$sourceCount and Target count:$targetCount")
         }


    // Test 3 - Validate the data
    println("Comparing source and target data...")
    startTime = getTimestamp()
    if (sourceDataFrame.except(targetRawData).count > 0 ){
        endTime = getTimestamp()
        // Update the result in the table
        println("Updating DataValidation result in table...")
           UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed DataMatch validation")
           // Force exit here if needed
          // sys.exit(1)
         }
       else{
           endTime = getTimestamp()
           println("Updating DataValidation result in table...")
           // Update the result in the table
           UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed DataMatch validation")
         }

    // Test 4 - Calculate the average and variance of Int or Dec columns
    // Test 5 - String length validation

  def UpdateResult(tableName: String, startTime: String, endTime: String, returnCode: Int, description: String){
    val insertString = s"INSERT INTO TABLE TestResult VALUES( FROM_UNIXTIME(UNIX_TIMESTAMP()),'$startTime','$endTime','$tableName',$returnCode,'$description')"
    val a = hc.sql(insertString)

    }


  def TableASchema(columnName: List[String]): StructType = {
    StructType(
      Seq(
        StructField(name = "datetime", dataType = TimestampType, nullable = true),
        StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
        StructField(name = "source_bank", dataType = StringType, nullable = true),
        StructField(name = "emp_name", dataType = StringType, nullable = true),
        StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
        StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
        )
    )
  }

  def row(line: List[String]): Row = {
       Row(convertToTimestamp(line(0).trim), convertToDate(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
    }


  def convertToTimestamp(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

   def convertToDate(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

    def flatten(scheme: StructType): Array[StructField] = scheme.fields.flatMap { f =>
      f.dataType match {
      case struct:StructType => flatten(struct)
      case _ => Array(f)
       }
      }

    def getTimestamp(): String = {
        val now = java.util.Calendar.getInstance()
        val timestampFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        timestampFormat.format(now.getTime())
    }

异常在下面:

17/12/21 05:18:40 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(8,0,ShuffleMapTask,TaskKilled(stage cancelled),org.apache.spark.scheduler.TaskInfo@78db3052,null)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 8.0 failed 1 times, most recent failure: Lost task 17.0 in stage 8.0 (TID 323, localhost, executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
  at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2430)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2429)
  at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
  at org.apache.spark.sql.Dataset.count(Dataset.scala:2429)
  ... 53 elided
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded

scala> 17/12/21 05:18:40 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: /tmp/spark-6f345216-41df-4fd6-8e3d-e34d49e28f0c
java.io.IOException: Failed to delete: /tmp/spark-6f345216-41df-4fd6-8e3d-e34d49e28f0c
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1031)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(ShutdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)

推荐答案

您的Spark进程在垃圾回收中浪费了太多时间.大多数cpu核心被消耗掉了,处理没有完成.执行器内存不足.您可以尝试以下选项

Your spark process is wasting too much time in Garbage collection.Most of the cpu core is getting consumed and processing doesnt completes.You are running out of executor memory.You can try below options

  • 调整属性spark.storage.memoryFractionspark.memory.storageFraction.您还可以发出命令来对此进行调整-spark-submit ... --executor-memory 4096m --num-executors 20..
  • 或者通过更改GC策略.检查当前的GC值.将该值设置为-XX:G1GC
  • Tune the property spark.storage.memoryFraction andspark.memory.storageFraction.You can also issue the command to tune this-spark-submit ... --executor-memory 4096m --num-executors 20..
  • Or by changing the GC policy.Check the current GC value.Set the value to -XX:G1GC

这篇关于Spark GC超出开销限制错误消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-17 06:02
查看更多