问题描述
我阅读了 HashPartitioner
.不幸的是,除了API调用外,没有太多解释.我假设HashPartitioner
基于键的哈希对分布式集进行分区.例如,如果我的数据是
I read up on the documentation of HashPartitioner
. Unfortunately nothing much was explained except for the API calls. I am under the assumption that HashPartitioner
partitions the distributed set based on the hash of the keys. For example if my data is like
(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
因此,分区程序会将其放入不同的分区中,而相同的键位于同一分区中.但是我不明白构造函数参数的重要性
So partitioner would put this into different partitions with same keys falling in the same partition. However I do not understand the significance of the constructor argument
new HashPartitoner(numPartitions) //What does numPartitions do?
对于上述数据集,如果我这样做的话,结果会有什么不同
For the above dataset how would the results differ if I did
new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
那么HashPartitioner
实际如何工作?
推荐答案
好吧,让您的数据集稍微有趣一点:
Well, lets make your dataset marginally more interesting:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
我们有六个要素:
rdd.count
Long = 6
没有分区程序:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
和八个分区:
rdd.partitions.length
Int = 8
现在,我们可以定义小助手来计算每个分区的元素数量:
Now lets define small helper to count number of elements per partition:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
由于我们没有分区程序,因此我们的数据集在分区之间均匀分布( Spark中的默认分区方案):
Since we don't have partitioner our dataset is distributed uniformly between partitions (Default Partitioning Scheme in Spark):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
现在让我们重新划分数据集:
Now lets repartition our dataset:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
由于传递给HashPartitioner
的参数定义了我们期望一个分区的分区数量:
Since parameter passed to HashPartitioner
defines number of partitions we have expect one partition:
rddOneP.partitions.length
Int = 1
由于我们只有一个分区,因此它包含所有元素:
Since we have only one partition it contains all elements:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
请注意,洗牌后的值顺序是不确定的.
Note that the order of values after the shuffle is non-deterministic.
如果使用HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
我们将获得2个分区:
rddTwoP.partitions.length
Int = 2
由于rdd
按关键数据进行分区,因此将不再均匀分布:
Since rdd
is partitioned by key data won't be distributed uniformly anymore:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
因为具有三个键,并且只有两个不同的hashCode
mod numPartitions
值,所以这里没有意外:
Because with have three keys and only two different values of hashCode
mod numPartitions
there is nothing unexpected here:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
只需确认以上内容即可:
Just to confirm the above:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
最后,在HashPartitioner(7)
中,我们得到七个分区,三个非空分区,每个分区有2个元素:
Finally with HashPartitioner(7)
we get seven partitions, three non-empty with 2 elements each:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
-
HashPartitioner
采用单个参数,该参数定义了分区的数量 使用 -
值分配给分区.
hash
函数可能因语言而异(Scala RDD可能使用hashCode
,DataSets
使用MurmurHash 3,PySpark,portable_hash
).
hash
键将HashPartitioner
takes a single argument which defines number of partitionsvalues are assigned to partitions using
hash
of keys.hash
function may differ depending on the language (Scala RDD may usehashCode
,DataSets
use MurmurHash 3, PySpark,portable_hash
).
在这样的简单情况下,其中key是一个小整数,您可以假定hash
是一个标识(i = hash(i)
).
In simple case like this, where key is a small integer, you can assume that hash
is an identity (i = hash(i)
).
Scala API使用 nonNegativeMod
,以根据计算得出的哈希值确定分区,
Scala API uses nonNegativeMod
to determine partition based on computed hash,
如果密钥分配不均匀,您可能会遇到部分集群处于空闲状态的情况
if distribution of keys is not uniform you can end up in situations when part of your cluster is idle
键必须是可哈希的.您可以查看我的答案,以获取作为PySpark的reduceByKey 的键的列表,以了解有关PySpark特定问题的信息. HashPartitioner突出显示了另一个可能的问题.文档:
keys have to be hashable. You can check my answer for A list as a key for PySpark's reduceByKey to read about PySpark specific issues. Another possible problem is highlighted by HashPartitioner documentation:
在Python 3中,您必须确保哈希是一致的.请参阅例外是什么:应该通过PYTHONHASHSEED禁用pyspark中字符串散列的随机性?
哈希分区程序既不是内射键也不是外射键.可以将多个键分配给一个分区,而某些分区可以保留为空.
Hash partitioner is neither injective nor surjective. Multiple keys can be assigned to a single partition and some partitions can remain empty.
请注意,当与REPL定义的案例类结合使用时,当前基于散列的方法在Scala中不起作用(案例类相等在Apache Spark中).
Please note that currently hash based methods don't work in Scala when combined with REPL defined case classes (Case class equality in Apache Spark).
HashPartitioner
(或任何其他Partitioner
)对数据进行混洗.除非在多个操作之间重用分区,否则它不会减少要重排的数据量.
HashPartitioner
(or any other Partitioner
) shuffles the data. Unless partitioning is reused between multiple operations it doesn't reduce amount of data to be shuffled.
这篇关于HashPartitioner如何工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!