问题描述
根据文档可以告诉Spark跟踪超出范围" 检查点-不再需要的检查点-并从磁盘中清除它们.
According the documentation is possible to tell Spark to keep track of "out of scope" checkpoints - those that are not needed anymore - and clean them from disk.
SparkSession.builder
...
.config("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
.getOrCreate()
显然是这样做的,但是问题是,最后一个被检查的rdds从未被删除.
Apparently it does so but the problem, however, is that the last checkpointed rdds are never deleted.
- 执行所有清理操作时是否缺少任何配置?
- 如果没有:有什么方法可以获取为特定应用程序创建的临时文件夹的名称,以便我可以通过编程方式将其删除?IE.以与获取
applicationId
相同的方式从
SparkContext
获取 0c514fb8-498c-4455-b147-aff242bd7381
推荐答案
我知道它的老问题,但是最近我正在使用 checkpoint
进行探索,并且遇到了类似的问题.希望分享调查结果.
I know its old question but recently i was exploring on checkpoint
and had similar problems. Would like to share the findings.
设置 spark.cleaner.referenceTracking.cleanCheckpoints = true
有时会起作用,但很难依靠它.官方文件说,通过设置此属性
Setting spark.cleaner.referenceTracking.cleanCheckpoints=true
is working sometime but its hard to rely on it. official document says that by setting this property
我不知道这到底意味着什么,因为我的理解是,一旦火花会话/上下文停止,就应该清理它.
I don't know what exactly it means because my understanding is once spark session/context is stopped it should clean it.
但是 ,我找到了您以下问题的答案
However, I found a answer to your below question
是,我们可以得到如下所示的 checkpointed
目录:
Yes, We can get the checkpointed
directory like below:
scala:
//Set directory
scala> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoint/")
scala> spark.sparkContext.getCheckpointDir.get
res3: String = hdfs://<name-node:port>/tmp/checkpoint/625034b3-c6f1-4ab2-9524-e48dfde589c3
//It gives String so we can use org.apache.hadoop.fs to delete path
PySpark:
// Set directory
>>> spark.sparkContext.setCheckpointDir('hdfs:///tmp/checkpoint')
>>> t = sc._jsc.sc().getCheckpointDir().get()
>>> t
u'hdfs://<name-node:port>/tmp/checkpoint/dc99b595-f8fa-4a08-a109-23643e2325ca'
// notice 'u' at the start which means It returns unicode object
// Below are the steps to get hadoop file system object and delete
>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True
>>> fs.delete(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True
这篇关于PySpark:全面清洁检查站的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!