问题描述
我的<一个文件上读了href=\"http://spark.apache.org/docs/1.3.1/api/java/index.html?org/apache/spark/HashPartitioner.html\">HashPartitioner.不幸的是没有什么,不同的是API调用解释。我认为HashPartitioner划分基于所述密钥的散列分布式集的假设下。例如,如果我的数据是像
I read up on the documentation of HashPartitioner. Unfortunately nothing much was explained except for the API calls. I am under the assumption that HashPartitioner partitions the distributed set based on the hash of the keys. For example if my data is like
(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
所以Partioner将投入与坠落在同一个分区相同的密钥不同的分区这一点。但我不明白的构造函数参数的意义
So Partioner would put this into different partitions with same keys falling in the same partition. However I do not understand the significance of the constructor argument
new HashPartitoner(numPartitions) //What does numPartitions do?
有关上述数据将如何结果不同,如果我做了
For the above dataset how would the results differ if I did
new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
那么,如何HashPartitioner工作实际上?
So how does HashPartitioner work actually?
推荐答案
好了,让让你的数据稍微更有趣的:
Well, lets make your dataset marginally more interesting:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
我们有六大要素:
scala> rdd.count
res32: Long = 6
没有partirioner:
no partirioner:
scala> rdd.partitioner
res33: Option[org.apache.spark.Partitioner] = None
和8个分区:
scala> rdd.partitions.length
res35: Int = 8
现在可以定义小帮手来计算每个分区的元素数:
Now lets define small helper to count number of elements per partition:
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
由于我们没有分区我们的数据集分区之间均匀分布的:
Since we don't have partitioner our dataset is distributed uniformly between partitions:
scala> countByPartition(rdd).collect()
res43: Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
现在让我们重新分区数据集:
Now lets repartition our dataset:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
由于传递到 HashPartitioner
参数定义分区的数量我们期望一个分区:
Since parameter passed to HashPartitioner
defines number of partitions we have expect one partition:
scala> rddOneP.partitions.length
res45: Int = 1
由于我们只有一个分区它包含的所有元素:
Since we have only one partition it contains all elements:
scala> countByPartition(rddOneP).collect
res48: Array[Int] = Array(6)
同样的方式,如果我们使用 HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
我们将得到2个分区:
scala> rddTwoP.partitions.length
res50: Int = 2
由于 RDD
由关键数据分区不会再均匀分布:
Since rdd
is partitioned by key data won't be distributed uniformly anymore:
scala> countByPartition(rddTwoP).collect()
res51: Array[Int] = Array(2, 4)
由于与有三把钥匙,只有两个散code的不同值
MOD numPartitions
有什么意想不到的位置:
Because with have three keys and only two different values of hashCode
mod numPartitions
there is nothing unexpected here:
scala> (1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
res55: scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
只是为了确认上述
Just to confirm above:
scala> rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
res58: Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
最后以 HashPartitioner(10)
我们得到10个分区,每三个非空的有2个元素:
Finally with HashPartitioner(10)
we get ten partitions, three non-empty with 2 elements each:
scala> val rddTenP = rdd.partitionBy(new HashPartitioner(10))
scala> rddTenP.partitions.length
res61: Int = 10
scala> countByPartition(rddTenP).collect()
res62: Array[Int] = Array(0, 2, 2, 2, 0, 0, 0, 0, 0, 0)
摘要
-
HashPartitioner
需要定义分区的数量的一个参数 - 值分配给使用
散code
键的分区 - 如果按键的分布是不均匀的,你可以的情况下结束,当你集群的一部分处于闲置状态
-
键必须是可哈希的。您可以检查我的答案阅读有关PySpark具体问题的一个关键。另一个可能的问题是由:
Summary
HashPartitioner
takes a single argument which defines number of partitions- values are assigned to partitions using
hashCode
of keys - if distribution of keys is not uniform you can end up in situations when part of your cluster is idle
keys have to be hashable. You can check my answer for A list as a key for PySpark's reduceByKey to read about PySpark specific issues. Another possible problem is highlighted by HashPartitioner documentation:
Java数组具有基于阵列的身份,而不是它们的内容散列codeS,因此试图分区的RDD [数组[]]或RDD [(阵列[的], _)使用HashPartitioner会产生意想不到的或不正确的结果。
在Python 3里,你必须确保哈希是一致的。请参见
In Python 3 you have to make sure that hashing is consistent. See What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?
这篇关于阿帕奇星火 - HashPartitioner:它是如何工作的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!