我坚持一些存储在var中的数据框。现在,当该var的值更改时,持久性如何工作?例如:

var checkedBefore_c = AddressValidation.validateAddressInAI(inputAddressesDF, addressDimTablePath, target_o, target_c, autoSeqColName).distinct.filter(col(CommonConstants.API_QUALITY_RATING) >= minQualityThreshold)
checkedBefore_c.persist(StorageLevel.MEMORY_AND_DISK_SER)

var pre_checkedBefore_c = checkedBefore_c.except(checkedBefore_o)
pre_checkedBefore_c.persist(StorageLevel.MEMORY_AND_DISK_SER)

checkedBefore_c = pre_checkedBefore_c.drop(target_o).drop(autoSeqColName)
          .withColumn(target_o, pre_checkedBefore_c(target_c))
          .withColumn(CommonConstants.API_STATUS, lit("AI-INSERT"))
          .withColumn(CommonConstants.API_ERROR_MESSAGE, lit(""))

checkedBefore_c = CommonUtils.addAutoIncremetColumn(checkedBefore_c, autoSeqColName)
checkedBefore_c = checkedBefore_c.select(addDimWithLoggingSchema.head, addDimWithLoggingSchema.tail: _*)
checkedBefore_c.persist(StorageLevel.MEMORY_AND_DISK_SER)

最佳答案

您正在尝试保持checkedBefore_c DataFrame,但是在您的代码中您尚未调用任何操作。

简要说明

Spark具有两种类型的操作:转换和操作。

转换:转换延迟完成,例如map,reduceByKey等。

操作:急于执行操作,例如foreach,count,save等。

持久性和缓存也是一种惰性操作,因此在您调用操作之前,持久性和缓存将不会执行。

有关更多详细信息,请参阅Spark中的Action。您也可以引用this

现在如何持续工作。
在持久状态下,将 Spark 存储分区存储在内存或磁盘中,或同时存储在两者中。
它们是各种选项,有关所有选项,请引用org.apache.spark.storage.StorageLevel源代码。

每个执行者将负责存储其分区,如果给出了in memory选项,则首先将尝试适合所有分区(如果不适合),然后将基于旧的缓存数据进行清理(这是LRY缓存)。如果所有分区仍不能容纳在内存中,它将缓存适合内存的分区,其余部分将保留。

如果选择了“带磁盘的内存”选项,则它将首先执行上述所有步骤,然后将左分区存储在本地磁盘中。

如果复制因子为2,则每个分区将缓存在两个不同的执行程序中。

在这种情况下,您已经传递了MEMORY_AND_DISK_SER,这意味着所有对象将在缓存之前进行序列化。默认情况下,使用Java序列化,但是您可以覆盖它并使用建议的Kyro序列化。

关于scala - 持久性在Spark中的工作方式,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/51896851/

10-12 23:07