我阅读了 HashPartitioner 的文档。不幸的是,除了API调用外,没有太多解释。我假设HashPartitioner基于键的哈希对分布式集进行分区。例如,如果我的数据是

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

因此,分区程序会将其放入不同的分区中,而相同的密钥位于同一分区中。但是我不明白构造函数参数的重要性
new HashPartitoner(numPartitions) //What does numPartitions do?

对于上述数据集,如果我这样做,结果将如何不同
new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

那么HashPartitioner实际如何工作?

最佳答案

好吧,让您的数据集稍微有趣一点:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

我们有六个要素:

rdd.count

Long = 6

没有分区:

rdd.partitioner

Option[org.apache.spark.Partitioner] = None

和八个分区:

rdd.partitions.length

Int = 8

现在让我们定义小助手来计算每个分区的元素数量:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

由于没有分区器,因此我们的数据集在分区之间是均匀分布的(Default Partitioning Scheme in Spark):

countByPartition(rdd).collect()

Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

scala - HashPartitioner如何工作?-LMLPHP

现在让我们重新划分数据集:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

由于传递给HashPartitioner的参数定义了分区数,因此我们期望一个分区:

rddOneP.partitions.length

Int = 1

由于我们只有一个分区,因此它包含所有元素:

countByPartition(rddOneP).collect

Array[Int] = Array(6)

scala - HashPartitioner如何工作?-LMLPHP

请注意,混洗后的值顺序是不确定的。

如果我们使用HashPartitioner(2),方法相同

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

我们将获得2个分区:

rddTwoP.partitions.length

Int = 2

由于rdd是按键数据分区的,因此将不再均匀分布:

countByPartition(rddTwoP).collect()

Array[Int] = Array(2, 4)

因为具有三个键,并且hashCode mod numPartitions只有两个不同的值,所以这里没有什么意外的:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))

scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

只是为了确认以上内容:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()

Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

scala - HashPartitioner如何工作?-LMLPHP

最后,通过HashPartitioner(7),我们得到七个分区,三个非空分区,每个分区有2个元素:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length

Int = 7

countByPartition(rddTenP).collect()

Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

scala - HashPartitioner如何工作?-LMLPHP

摘要和注释
  • HashPartitioner采用单个参数来定义分区数量
  • 使用键的hash
  • 值分配给分区。 hash函数可能因语言而异(Scala RDD可能使用hashCodeDataSets使用MurmurHash 3,PySpark和 portable_hash )。

    在这样的简单情况下,其中key是一个小整数,您可以假定hash是一个标识(i = hash(i))。

    Scala API使用 nonNegativeMod 根据计算所得的哈希
  • 确定分区
  • 如果键的分配不均匀,则可能会遇到部分集群空闲的情况。
  • 键必须是可哈希的。您可以检查我的A list as a key for PySpark's reduceByKey答案,以了解有关PySpark特定问题的信息。 HashPartitioner documentation突出了另一个可能的问题:

  • 在Python 3中,您必须确保哈希是一致的。参见What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?
  • 哈希分区程序既不是内射键也不是外射键。可以将多个键分配给单个分区,并且某些分区可以保留为空。
  • 请注意,当与REPL定义的案例类(Case class equality in Apache Spark)结合使用时,当前基于散列的方法在Scala中不起作用。
  • HashPartitioner(或任何其他Partitioner)将数据随机播放。除非在多个操作之间重用分区,否则它不会减少要重排的数据量。
  • 08-28 05:05