问题描述
我在Spark上运行了一个迭代应用程序,并将其简化为以下代码:
I have an iterative application running on Spark that I simplified to the following code:
var anRDD: org.apache.spark.rdd.RDD[Int] = sc.parallelize((0 to 1000))
var c: Long = Int.MaxValue
var iteration: Int = 0
while (c > 0) {
iteration += 1
// Manipulate the RDD and cache the new RDD
anRDD = anRDD.zipWithIndex.filter(t => t._2 % 2 == 1).map(_._1).cache() //.localCheckpoint()
// Actually compute the RDD and spawn a new job
c = anRDD.count()
println(s"Iteration: $iteration, Values: $c")
}
后续作业中的内存分配将如何处理?
What happens to the memory allocation within consequent jobs?
- 当前
anRDD
是替代"先前的还是全部保留在内存中?从长远来看,这可能会引发一些内存异常 -
localCheckpoint
和cache
是否具有不同的行为?如果使用localCheckpoint
代替cache
,因为localCheckpoint
会截断RDD世系,那么我希望以前的RDD会被覆盖
- Does the current
anRDD
"override" the previous ones or are they all kept into memory? In the long run, this can throw some memory exception - Do
localCheckpoint
andcache
have different behaviors? IflocalCheckpoint
is used in place ofcache
, aslocalCheckpoint
truncates the RDD lineage, then I would expect the previous RDDs to be overridden
推荐答案
不幸的是,Spark不适用于此类情况.
Unfortunately seems that Spark is not good for things like that.
您的原始实现方式不可行,因为在每次迭代中,较新的RDD都会对较旧的RDD进行内部引用,因此所有RDD都会堆积在内存中.
Your original implementation is not viable because on each iteration the newer RDD will have an internal reference to the older one so all RDDs pile up in memory.
localCheckpoint
是您要达到的目标的近似值.它确实截断了RDD的沿袭,但您失去了容错能力.该方法的文档中已明确说明.
localCheckpoint
is an approximation of what you are trying to achieve. It does truncate RDD's lineage but you lose fault tolerance. It's clearly stated in the documentation for this method.
checkpoint
也是一个选项.这是安全的,但每次迭代都会将数据转储到hdfs.
checkpoint
is also an option. It is safe but it would dump the data to hdfs on each iteration.
考虑重新设计方法.这样的骇客迟早会咬人.
Consider redesigning the approach. Such hacks could bite sooner or later.
这篇关于Spark中的迭代缓存与检查点的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!