我正在尝试调整以下过程,因为我的Java heap space error.
查看Spark UI,有一个cogroup
的行为非常奇怪。
在此阶段之前,一切看起来都非常平衡(目前我已经硬编码了分区数量48)。在方法loadParentMPoint
中有协同组变换,基本上,当我要执行下一个计数时,计算了协同组,并且基本上安排了48个任务,但是其中47个任务立即终止(似乎没有什么要处理的) ),除了开始进行改组读取的那个,直到它填满堆空间并引发异常为止。
我已经启动了几次使用相同数据集的过程,并且最终始终是相同的。每次它只执行一个执行程序。
为什么我有这种行为?也许我缺少任何东西?我尝试在cogroup之前先repartition
数据,因为我认为它是不平衡的,但它不起作用,与尝试使用partitionBy
时相同。
这是代码摘录:
class BillingOrderGeneratorProcess extends SparkApplicationErrorHandler {
implicit val ctx = sc
val log = LoggerFactory.getLogger(classOf[BillingOrderGeneratorProcess])
val ipc = new Handler[ConsumptionComputationBigDataIPC]
val billingOrderDao = new Handler[BillingOrderDao]
val mPointDao = new Handler[MeasurementPointDAO]
val billingOrderBDao = new Handler[BillingOrderBDAO]
val ccmDiscardBdao = new Handler[CCMDiscardBDAO]
val ccmService = new Handler[ConsumptionComputationBillingService]
val registry = new Handler[IncrementalRegistryTableData]
val podTimeZoneHelper = new Handler[PodDateTimeUtils]
val billingPodStatusDao = new Handler[BillingPodStatusBDAO]
val config = new Handler[PropertyManager]
val paramFacade = new Handler[ConsumptionParameterFacade]
val consumptionMethods = new Handler[ConsumptionMethods]
val partitions = config.get.defaultPartitions()
val appName = sc.appName
val appId = sc.applicationId
val now = new DateTime
val extracted = ctx.accumulator(0l, "Extracted from planning")
val generated = ctx.accumulator(0l, "Billing orders generated")
val discarded = ctx.accumulator(0l, "Billing orders discarded")
// initialize staging
val staging = new TxStagingTable(config.get().billingOrderGeneratorStagingArea())
staging.prepareReading
val rddExtractedFromPlanning = staging
.read[ExtractedPO]()
.repartition(48)
.setName("rddExtractedFromPlanning")
.cache
val rddExtracted = rddExtractedFromPlanning
.filter { x =>
extracted += 1
(x.getExtracted == EExtractedType.EXTRACTED ||
x.getExtracted == EExtractedType.EXTRACTED_BY_USER ||
x.getExtracted == EExtractedType.EXTRACTED_BY_TDC)
}
.map { x =>
log.info("1:extracted>{}", x)
val bo = MapperUtil.mapExtractedPOtoBO(x)
bo
}
val podWithExtractedAndLastBillingOrderPO = rddExtracted.map { e =>
val billOrdr = CCMIDGenerator.newIdentifier(CCMIDGenerator.Context.GENERATOR, e.getPod, e.getCycle(), e.getExtractionDate())
val last = billingOrderDao.get.getLastByPodExcludedActual(e.getPod, billOrdr)
log.info("2:last Billing order>{}", last);
(e.getPod, e, last)
}
.setName("podWithExtractedAndLastBillingOrderPO")
.cache()
val podWithExtractedAndLastBillingOrder = podWithExtractedAndLastBillingOrderPO.map(e => (e._1, (e._2, MapperUtil.mapBillingOrderPOtoBO(e._3))))
val rddRegistryFactoryKeys = podWithExtractedAndLastBillingOrderPO
.map(e => (e._1,1))
.reduceByKey(_+_)
.keys
val rddRegistryFactory = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryKeys, List())
val rddExtractedWithMPoint = ConsumptionComputationUtil
.groupPodWithMPoint(podWithExtractedAndLastBillingOrder, rddRegistryFactory)
.filter{ e =>
val mPoint = e._3
val condition = mPoint != null
condition match {
case false => log.error("MPoint is NULL for POD -> " + e._1)
case true =>
}
condition
}
.setName("rddExtractedWithMPoint")
.cache
rddExtractedWithMPoint.count
val rddExtractedWithMPointWithParent = ConsumptionComputationUtil
.groupWithParent(rddExtractedWithMPoint)
.map{
case (pod, extracted, measurementPoint, billOrder, parentMpointId, factory) =>
if (!parentMpointId.isEmpty) {
val mPointParent = mPointDao.get.findByMPoint(parentMpointId.get)
log.info("2.1:parentMpoin>Mpoint=" + parentMpointId + " parent for pod -> " + pod)
(pod, extracted, measurementPoint, billOrder, mPointParent.getPod, factory)
} else {
log.info("2.1:parentMpoin>Mpoint=null parent for pod -> " + pod)
(pod, extracted, measurementPoint, billOrder, null, factory)
}
}
.setName("rddExtractedWithMPointWithParent")
.cache()
rddExtractedWithMPointWithParent.count
val rddRegistryFactoryParentKeys = rddExtractedWithMPointWithParent
.filter(e => Option(e._5).isDefined)
.map(e => (e._5,1))
.reduceByKey(_+_)
.keys
rddRegistryFactoryParentKeys.count
val rddRegistryFactoryParent = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryParentKeys, List())
rddRegistryFactoryParent.count
val imprb = new Handler[IncrementalMeasurementPointRegistryBuilder]
val rddNew = rddExtractedWithMPointWithParent.map({
case (pod, extracted, measurementPoint, billingOrder, parentPod, factory) =>
(parentPod, (pod, extracted, measurementPoint, billingOrder, factory))
})
rddNew.count
val p = rddNew.cogroup(rddRegistryFactoryParent)
p.count
val rddExtractedWithMPointWithMpointParent = p.filter{ case (pod, (inputs, mpFactories)) => inputs.nonEmpty }
.flatMap{ case (pod, (inputs, mpFactories)) =>
val factory = mpFactories.headOption //eventually one or none factory
val results = inputs.map{e =>
val measurementPointTupla = factory.flatMap{f =>
Option(imprb.get.buildSparkDecorator(new MeasurementPointFactoryAdapter(f)).getMeasurementPointByDate(e._2.getRequestDate), f)
}
val tupla = measurementPointTupla.getOrElse(null)
val toBeBilled = if(tupla!=null && tupla._1!=null) false else true
val m = if(tupla!=null && tupla._1!=null) tupla._1 else null
val f = if(tupla!=null && tupla._2!=null) tupla._2 else null
(e._1, e._2, e._3, e._4, m, toBeBilled, e._5 , f)
}
results
}
.setName("rddExtractedWithMPointWithMpointParent")
.cache()
rddExtractedWithMPointWithMpointParent.foreach({ e =>
log.info("2.2:parentMpoint>MpointComplete=" + e._5 + " parent for pod -> " + e._1)
})
}
这是参与协同操作rddNew的两个RDD的阶段:
rddRegistryFactory:
这是联合小组的阶段:
这是存储情况:
这是执行者选项卡:
N.B.我添加了count操作,仅用于调试目的。
更新:
我试图删除缓存并再次启动该过程,现在每个执行程序都有大约100M用于存储数据,但是行为是相同的:随机读取仅发生于一个执行程序。
我还尝试了在cogroup之前在相同的两个RDD之间进行联接操作,只是为了知道我遇到的问题是仅与cogroup相关还是扩展到所有广泛的转换,并且对于联接而言,该行为具有完全一样。
最佳答案
我强烈认为此Java heap space error
是由于缓存的rdds所致,根据您的上一个屏幕快照(即“存储”选项卡)来看,这似乎不必要。
根据访问数据集的次数以及这样做所涉及的工作量,重新计算的速度可能快于增加的内存压力所付出的代价。
不用说,如果仅在没有意义的情况下读取数据集,则实际上会使您的工作变慢。
为了进行调试计数,可以使用countApprox()
代替count
。测试完成后,您可以将其删除以实际使用您的工作
最重要的是,通过打印每个分区的记录数来确保您的数据是一致的。如果需要,您可以重新分区和合并。
可以像这样获取每个分区的记录数:
df
.rdd
.mapPartitionsWithIndex {case(i,rows)=>迭代器((i,rows.size))}
.toDF(“ partition_number”,“ record_of_records”)
。表演