spark checkpoint和持久保存到磁盘之间有什么区别。这些都存储在本地磁盘中吗?

最佳答案

几乎没有什么重要的区别,但最根本的区别是沿袭会发生什么。 Persist / cache保持血统不变,而checkpoint破坏血统。让我们考虑以下示例:

import org.apache.spark.storage.StorageLevel

val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)
  • cache / persist:

    val indCache  = rdd.mapValues(_ > 4)
    indCache.persist(StorageLevel.DISK_ONLY)
    
    indCache.toDebugString
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
    
    indCache.count
    // 3
    
    indCache.toDebugString
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
    //  |       CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
    
  • checkpoint:

    val indChk  = rdd.mapValues(_ > 4)
    indChk.checkpoint
    
    indChk.toDebugString
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 []
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 []
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 []
    
    indChk.count
    // 3
    
    indChk.toDebugString
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
    //  |  ReliableCheckpointRDD[12] at count at <console>:27 []
    

  • 如您所见,在第一种情况下,即使从缓存中获取了数据,沿袭也得以保留。这意味着,如果indCache的某些分区丢失,则可以从头开始重新计算数据。在第二种情况下,在检查点之后,谱系完全丢失,并且indChk不再包含重建它所需的信息。
    checkpointcache / persist不同,它是与其他作业分开计算的。这就是为什么应缓存标记为检查点的RDD的原因:



    最后,checkpointed数据是持久性的,并且在SparkContext被销毁后不会删除。

    关于SparkContext.setCheckpointDir使用的数据存储RDD.checkpoint,如果在非本地模式下运行,则需要DFS路径。否则,它也可以是本地文件系统。无需复制的localCheckpointpersist应该使用本地文件系统。

    重要说明:

    RDD检查点与Spark Streaming中的检查点不同。前者旨在解决沿袭问题,后一个问题全与流可靠性和故障恢复有关。

    关于apache-spark - spark checkpoint和持久存储到磁盘之间有什么区别,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/35127720/

    10-13 00:05