我正在尝试调整以下过程,因为我的Java heap space error.

查看Spark UI,有一个cogroup的行为非常奇怪。
在此阶段之前,一切看起来都非常平衡(目前我已经硬编码了分区数量48)。在方法loadParentMPoint中有协同组变换,基本上,当我要执行下一个计数时,计算了协同组,并且基本上安排了48个任务,但是其中47个任务立即终止(似乎没有什么要处理的) ),除了开始进行改组读取的那个,直到它填满堆空间并引发异常为止。

我已经启动了几次使用相同数据集的过程,并且最终始终是相同的。每次它只执行一个执行程序。

为什么我有这种行为?也许我缺少任何东西?我尝试在cogroup之前先repartition数据,因为我认为它是不平衡的,但它不起作用,与尝试使用partitionBy时相同。

这是代码摘录:

    class BillingOrderGeneratorProcess extends SparkApplicationErrorHandler {

    implicit val ctx = sc
    val log = LoggerFactory.getLogger(classOf[BillingOrderGeneratorProcess])
    val ipc = new Handler[ConsumptionComputationBigDataIPC]
    val billingOrderDao = new Handler[BillingOrderDao]
    val mPointDao = new Handler[MeasurementPointDAO]
    val billingOrderBDao = new Handler[BillingOrderBDAO]
    val ccmDiscardBdao = new Handler[CCMDiscardBDAO]
    val ccmService = new Handler[ConsumptionComputationBillingService]
    val registry = new Handler[IncrementalRegistryTableData]
    val podTimeZoneHelper = new Handler[PodDateTimeUtils]
    val billingPodStatusDao = new Handler[BillingPodStatusBDAO]
    val config = new Handler[PropertyManager]
    val paramFacade = new Handler[ConsumptionParameterFacade]
    val consumptionMethods = new Handler[ConsumptionMethods]
    val partitions = config.get.defaultPartitions()
    val appName = sc.appName
    val appId = sc.applicationId
    val now = new DateTime

    val extracted = ctx.accumulator(0l, "Extracted from planning")
    val generated = ctx.accumulator(0l, "Billing orders generated")
    val discarded = ctx.accumulator(0l, "Billing orders discarded")

    // initialize staging
    val staging = new TxStagingTable(config.get().billingOrderGeneratorStagingArea())
    staging.prepareReading

    val rddExtractedFromPlanning = staging
        .read[ExtractedPO]()
        .repartition(48)
        .setName("rddExtractedFromPlanning")
        .cache

    val rddExtracted = rddExtractedFromPlanning
      .filter { x =>
        extracted += 1
        (x.getExtracted == EExtractedType.EXTRACTED ||
         x.getExtracted == EExtractedType.EXTRACTED_BY_USER ||
         x.getExtracted == EExtractedType.EXTRACTED_BY_TDC)
      }
      .map { x =>
        log.info("1:extracted>{}", x)
        val bo = MapperUtil.mapExtractedPOtoBO(x)
        bo
      }

    val podWithExtractedAndLastBillingOrderPO = rddExtracted.map { e =>
      val billOrdr = CCMIDGenerator.newIdentifier(CCMIDGenerator.Context.GENERATOR, e.getPod, e.getCycle(), e.getExtractionDate())
      val last = billingOrderDao.get.getLastByPodExcludedActual(e.getPod, billOrdr)
      log.info("2:last Billing order>{}", last);
      (e.getPod, e, last)
    }
      .setName("podWithExtractedAndLastBillingOrderPO")
      .cache()

    val podWithExtractedAndLastBillingOrder = podWithExtractedAndLastBillingOrderPO.map(e => (e._1, (e._2, MapperUtil.mapBillingOrderPOtoBO(e._3))))

    val  rddRegistryFactoryKeys = podWithExtractedAndLastBillingOrderPO
      .map(e => (e._1,1))
      .reduceByKey(_+_)
      .keys

    val rddRegistryFactory = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryKeys, List())

    val rddExtractedWithMPoint = ConsumptionComputationUtil
      .groupPodWithMPoint(podWithExtractedAndLastBillingOrder, rddRegistryFactory)
      .filter{ e =>
        val mPoint = e._3
        val condition = mPoint != null
        condition match {
          case false => log.error("MPoint is NULL for POD -> " + e._1)
          case true =>
        }
        condition
      }
      .setName("rddExtractedWithMPoint")
      .cache

    rddExtractedWithMPoint.count

    val rddExtractedWithMPointWithParent = ConsumptionComputationUtil
      .groupWithParent(rddExtractedWithMPoint)
      .map{
        case (pod, extracted, measurementPoint, billOrder, parentMpointId, factory) =>
          if (!parentMpointId.isEmpty) {
            val mPointParent = mPointDao.get.findByMPoint(parentMpointId.get)
            log.info("2.1:parentMpoin>Mpoint=" + parentMpointId + " parent for pod -> " + pod)
            (pod, extracted, measurementPoint, billOrder, mPointParent.getPod, factory)
          } else {
            log.info("2.1:parentMpoin>Mpoint=null parent for pod -> " + pod)
            (pod, extracted, measurementPoint, billOrder, null, factory)
          }
      }
        .setName("rddExtractedWithMPointWithParent")
        .cache()

    rddExtractedWithMPointWithParent.count

    val rddRegistryFactoryParentKeys = rddExtractedWithMPointWithParent
      .filter(e => Option(e._5).isDefined)
      .map(e => (e._5,1))
      .reduceByKey(_+_)
      .keys

    rddRegistryFactoryParentKeys.count

    val rddRegistryFactoryParent = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryParentKeys, List())

    rddRegistryFactoryParent.count

    val imprb = new Handler[IncrementalMeasurementPointRegistryBuilder]

    val rddNew = rddExtractedWithMPointWithParent.map({
      case (pod, extracted, measurementPoint, billingOrder, parentPod, factory) =>
        (parentPod, (pod, extracted, measurementPoint, billingOrder, factory))
    })
    rddNew.count

    val p = rddNew.cogroup(rddRegistryFactoryParent)
    p.count

    val rddExtractedWithMPointWithMpointParent = p.filter{ case (pod, (inputs, mpFactories)) => inputs.nonEmpty }
    .flatMap{ case (pod, (inputs, mpFactories)) =>
        val factory = mpFactories.headOption //eventually one or none factory
        val results = inputs.map{e =>
          val measurementPointTupla = factory.flatMap{f =>
            Option(imprb.get.buildSparkDecorator(new MeasurementPointFactoryAdapter(f)).getMeasurementPointByDate(e._2.getRequestDate), f)
         }
          val tupla = measurementPointTupla.getOrElse(null)
          val toBeBilled = if(tupla!=null && tupla._1!=null) false else true
          val m = if(tupla!=null && tupla._1!=null) tupla._1 else null
          val f = if(tupla!=null && tupla._2!=null) tupla._2 else null
          (e._1, e._2, e._3, e._4, m, toBeBilled, e._5 , f)
        }
      results
    }
    .setName("rddExtractedWithMPointWithMpointParent")
    .cache()

    rddExtractedWithMPointWithMpointParent.foreach({ e =>
      log.info("2.2:parentMpoint>MpointComplete=" + e._5 + " parent for pod -> " + e._1)
    })
}


这是参与协同操作rddNew的两个RDD的阶段:

performance - 调整Spark作业-LMLPHP

rddRegistryFactory:

performance - 调整Spark作业-LMLPHP

这是联合小组的阶段:

performance - 调整Spark作业-LMLPHP

这是存储情况:

performance - 调整Spark作业-LMLPHP

这是执行者选项卡:

performance - 调整Spark作业-LMLPHP

N.B.我添加了count操作,仅用于调试目的。

更新:


我试图删除缓存并再次启动该过程,现在每个执行程序都有大约100M用于存储数据,但是行为是相同的:随机读取仅发生于一个执行程序。
我还尝试了在cogroup之前在相同的两个RDD之间进行联接操作,只是为了知道我遇到的问题是仅与cogroup相关还是扩展到所有广泛的转换,并且对于联接而言,该行为具有完全一样。

最佳答案

我强烈认为此Java heap space error是由于缓存的rdds所致,根据您的上一个屏幕快照(即“存储”选项卡)来看,这似乎不必要。

performance - 调整Spark作业-LMLPHP
根据访问数据集的次数以及这样做所涉及的工作量,重新计算的速度可能快于增加的内存压力所付出的代价。
不用说,如果仅在没有意义的情况下读取数据集,则实际上会使您的工作变慢。

为了进行调试计数,可以使用countApprox()代替count。测试完成后,您可以将其删除以实际使用您的工作

最重要的是,通过打印每个分区的记录数来确保您的数据是一致的。如果需要,您可以重新分区和合并。
可以像这样获取每个分区的记录数:


df
.rdd
.mapPartitionsWithIndex {case(i,rows)=>迭代器((i,rows.size))}
.toDF(“ partition_number”,“ record_of_records”)
。表演

10-04 23:40