火花分区落入单个分区

火花分区落入单个分区

本文介绍了火花分区落入单个分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习spark,当我在pyspark shell中使用以下表达式测试repartition()函数时,观察到一个非常奇怪的结果:所有元素都在repartition()函数之后落入同一分区.在这里,我使用glom()来了解rdd中的分区.我期望repartition()可以随机播放元素,并在分区之间随机分配它们.仅当我使用新数量的分区< =原始分区重新分区时,才会发生这种情况.

I am learning spark, and when I tested repartition() function in pyspark shell with the following expression, I observed a very strange result: all elements fall into the same partition after repartition() function.Here, I used glom() to learn about the partitioning within the rdd. I was expecting repartition() to shuffle the elements and randomly distribute them among partitions. This only happens when I repartition with new number of partitions <= original partitions.

在测试期间,如果我设置了新的分区数>原始分区,也不会观察到任何改组.我在这里做错什么了吗?

During my test, if I set new number of partitions > original partitions, there is also no shuffling observed. Am I doing anything wrong here?

In [1]: sc.parallelize(range(20), 8).glom().collect()
Out[1]:
[[0, 1],
 [2, 3],
 [4, 5],
 [6, 7, 8, 9],
 [10, 11],
 [12, 13],
 [14, 15],
 [16, 17, 18, 19]]

In [2]: sc.parallelize(range(20), 8).repartition(8).glom().collect()
Out[2]:
[[],
 [],
 [],
 [],
 [],
 [],
 [2, 3, 6, 7, 8, 9, 14, 15, 16, 17, 18, 19, 0, 1, 12, 13, 4, 5, 10, 11],
 []]

In [3]: sc.parallelize(range(20), 8).repartition(10).glom().collect()
Out[3]:
[[],
 [0, 1],
 [14, 15],
 [10, 11],
 [],
 [6, 7, 8, 9],
 [2, 3],
 [16, 17, 18, 19],
 [12, 13],
 [4, 5]]

我正在使用Spark版本2.1.1.

I am using spark version 2.1.1.

推荐答案

恭喜!您刚刚重新发现 SPARK-21782 -当numPartitions为2的幂:

Congratulations! You just rediscovered SPARK-21782 - Repartition creates skews when numPartitions is a power of 2:

对于每个初始分区索引,将位置生成为(new Random(index)).nextInt(numPartitions) 然后,对于初始分区索引中的元素编号k,将其放在新的分区位置+ k(对numPartitions取模).

for each initial partition index, generate position as (new Random(index)).nextInt(numPartitions) then, for element number k in initial partition index, put it in the new partition position + k (modulo numPartitions).

因此,从本质上讲,元素在numPartitions存储桶上的涂抹大致相同-从编号为+1的存储桶开始.

So, essentially elements are smeared roughly equally over numPartitions buckets - starting from the one with number position+1.

请注意,将为每个初始分区索引(具有固定的种子索引)创建一个Random新实例,然后将其丢弃.因此,该位置对于世界上任何RDD的每个索引都是确定的.同样,nextInt(bound)实现的一个特殊情况是,bound是2的幂,它基本上从初始种子中抽取了几个最高位,而加扰最少.

Note that a new instance of Random is created for every initial partition index, with a fixed seed index, and then discarded. So the position is deterministic for every index for any RDD in the world. Also, nextInt(bound) implementation has a special case when bound is a power of 2, which is basically taking several highest bits from the initial seed, with only a minimal scrambling.

PySpark使情况变得更糟,因为它使用批处理的方法序列化器,其默认批处理大小等于10,因此每个分区上的项目数量很少,所有项目都被改组为相同的输出.

PySpark makes it worse, because it uses batched serializer with default batch size equal to 10 so with small number of items on each partition, all are shuffled to the same output.

好消息是,由于已在Spark 2.3中解决 href ="https://github.com/megaserg" rel ="nofollow noreferrer">谢尔盖·谢列布里亚科夫.

Good news is, it is already resolved in Spark 2.3 thanks to Sergey Serebryakov.

这篇关于火花分区落入单个分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-24 03:41