问题描述
我正在阅读apache spark的源代码.我陷入了Range Partitioner的草绘方法逻辑的困境.有人可以解释一下这段代码到底在做什么吗?
I was reading the source code of apache spark. And i got stuck at logic of Range Partitioner's sketch method. Can someone please explain me what exactly is this code doing?
// spark/core/src/main/scala/org/apache/spark/Partitioner.scala
def sketch[K:ClassTag](rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2.toLong).sum
(numItems, sketched)
}
推荐答案
sketch在RangePartitioner中用于对RDD分区中的值进行采样.也就是说-从每个RDD分区中统一随机地挑选和收集元素值的一小部分.
sketch is used in RangePartitioner to sample values in RDD partitions. That is - to uniformly and randomly pick and collect small subset of element values from every RDD partition.
请注意,草图用作RangePartitioner的一部分-可以计算出产生的近似相等大小的分区的范围边界.其他RangePartitioner代码中还有其他很酷的事情-即当它计算所需的样本子集大小(sampleSizePerPartition)时.
Note that sketch is used as a part of RangePartitioner - to figure out range bounds for produced approximately equally sized partitions. Other cool things happen in other RangePartitioner code - i.e. when it calculates required size of the sample subset (sampleSizePerPartition).
将我的评论作为代码的一部分进行逐步说明.
See my comments as a part of the code for step by step explanation.
def sketch[K:ClassTag](rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
// run sampling function on every partition
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
// partition number `idx` - and rdd.id are used to calculate unique seed for every partition - to ensure that elements are selected in unique manner for every parition
val seed = byteswap32(idx ^ (shift << 16))
// randomly select sample of n elements and count total number of elements in partition
// what is cool about Reservoir Sampling - that it does it in a single pass - O(N) where N is number of elements in partition
// see more http://en.wikipedia.org/wiki/Reservoir_sampling
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2.toLong).sum
// returns total count of elements in RDD and samples
(numItems, sketched)
}
这篇关于在Spark的RangePartitioner中做草图的方法是什么的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!