本文介绍了PySpark:全面清洁检查站的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据文档可以告诉Spark跟踪超出范围" 检查点-不再需要的检查点-并从磁盘中清除它们.

According the documentation is possible to tell Spark to keep track of "out of scope" checkpoints - those that are not needed anymore - and clean them from disk.

SparkSession.builder
  ...
  .config("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
  .getOrCreate()

显然是这样做的,但是问题是,最后一个被检查的rdds从未被删除.

Apparently it does so but the problem, however, is that the last checkpointed rdds are never deleted.

  • 执行所有清理操作时是否缺少任何配置?
  • 如果没有:有什么方法可以获取为特定应用程序创建的临时文件夹的名称,以便我可以通过编程方式将其删除?IE.以与获取 applicationId
  • 相同的方式从 SparkContext 获取 0c514fb8-498c-4455-b147-aff242bd7381

推荐答案

我知道它的老问题,但是最近我正在使用 checkpoint 进行探索,并且遇到了类似的问题.希望分享调查结果.

I know its old question but recently i was exploring on checkpoint and had similar problems. Would like to share the findings.

设置 spark.cleaner.referenceTracking.cleanCheckpoints = true 有时会起作用,但很难依靠它.官方文件说,通过设置此属性

Setting spark.cleaner.referenceTracking.cleanCheckpoints=true is working sometime but its hard to rely on it. official document says that by setting this property

我不知道这到底意味着什么,因为我的理解是,一旦火花会话/上下文停止,就应该清理它.

I don't know what exactly it means because my understanding is once spark session/context is stopped it should clean it.

但是 ,我找到了您以下问题的答案

However, I found a answer to your below question

,我们可以得到如下所示的 checkpointed 目录:

Yes, We can get the checkpointed directory like below:

scala:

//Set directory
scala> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoint/")

scala> spark.sparkContext.getCheckpointDir.get
res3: String = hdfs://<name-node:port>/tmp/checkpoint/625034b3-c6f1-4ab2-9524-e48dfde589c3

//It gives String so we can use org.apache.hadoop.fs to delete path

PySpark:

// Set directory
>>> spark.sparkContext.setCheckpointDir('hdfs:///tmp/checkpoint')
>>> t = sc._jsc.sc().getCheckpointDir().get()
>>> t
u'hdfs://<name-node:port>/tmp/checkpoint/dc99b595-f8fa-4a08-a109-23643e2325ca'

// notice 'u' at the start which means It returns unicode object
// Below are the steps to get hadoop file system object and delete

>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

>>> fs.delete(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

这篇关于PySpark:全面清洁检查站的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-15 19:12