本文介绍了在Spark的RangePartitioner中做草图的方法是什么的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在阅读apache spark的源代码.我陷入了Range Partitioner的草绘方法逻辑的困境.有人可以解释一下这段代码到底在做什么吗?

I was reading the source code of apache spark. And i got stuck at logic of Range Partitioner's sketch method. Can someone please explain me what exactly is this code doing?

// spark/core/src/main/scala/org/apache/spark/Partitioner.scala

def sketch[K:ClassTag](rdd: RDD[K],
  sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {

  val shift = rdd.id
  // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
  val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
    val seed = byteswap32(idx ^ (shift << 16))
    val (sample, n) = SamplingUtils.reservoirSampleAndCount(
    iter, sampleSizePerPartition, seed)
    Iterator((idx, n, sample))
  }.collect()
  val numItems = sketched.map(_._2.toLong).sum
  (numItems, sketched)
}

推荐答案

sketch在RangePartitioner中用于对RDD分区中的值进行采样.也就是说-从每个RDD分区中统一随机地挑选和收集元素值的一小部分.

sketch is used in RangePartitioner to sample values in RDD partitions. That is - to uniformly and randomly pick and collect small subset of element values from every RDD partition.

请注意,草图用作RangePartitioner的一部分-可以计算出产生的近似相等大小的分区的范围边界.其他RangePartitioner代码中还有其他很酷的事情-即当它计算所需的样本子集大小(sampleSizePerPartition)时.

Note that sketch is used as a part of RangePartitioner - to figure out range bounds for produced approximately equally sized partitions. Other cool things happen in other RangePartitioner code - i.e. when it calculates required size of the sample subset (sampleSizePerPartition).

将我的评论作为代码的一部分进行逐步说明.

See my comments as a part of the code for step by step explanation.

def sketch[K:ClassTag](rdd: RDD[K],
  sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {

  val shift = rdd.id
  // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
  // run sampling function on every partition
  val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
    // partition number `idx` - and rdd.id are used to calculate unique seed for every partition - to ensure that elements are selected in unique manner for every parition
    val seed = byteswap32(idx ^ (shift << 16))
    // randomly select sample of n elements and count total number of elements in partition
    // what is cool about Reservoir Sampling - that it does it in a single pass - O(N) where N is number of elements in partition
    // see more http://en.wikipedia.org/wiki/Reservoir_sampling
    val (sample, n) = SamplingUtils.reservoirSampleAndCount(
    iter, sampleSizePerPartition, seed)
    Iterator((idx, n, sample))
  }.collect()
  val numItems = sketched.map(_._2.toLong).sum
  // returns total count of elements in RDD and samples
  (numItems, sketched)
}

这篇关于在Spark的RangePartitioner中做草图的方法是什么的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-21 08:41