我正在加入两个RDD rddArddB
rddA具有100个分区,而rddB具有500个分区。

我试图了解join操作的机制。默认情况下,无论连接的顺序如何,我最终都具有相同的分区结构;即rddA.join(rddB)和rddB.join(rddA)产生相同数量的分区,通过观察它使用较小的分区大小100。我知道我可以通过使用rddA.join(rddB,500)来增加分区大小,但是我对引擎盖下发生的事情更感兴趣以及为什么选择较小的尺寸。从观察来看,即使我重新分割了小的rdd,它的分割仍然会被使用; Spark是否对密钥大小进行启发式分析?

我遇到的另一个问题是偏斜度。我的较小分区最终有3,314个条目,而较大分区最终有1,139,207个,总大小为599,911,729(键)。两个RDD都使用默认的分区程序,那么如何确定数据混洗?
我隐约记得阅读过的内容,如果一个rdd设置了一个分区程序,那么将使用该分区程序。是这样吗是否“建议”这样做?

最后,请注意,我的两个rdd都相对较大(〜90GB),因此广播联接将无济于事。相反,任何为join操作提供一些见解的方法都可能是可行的方法。

PS。关于左右连接机制的任何详细信息将是额外的好处:)

最佳答案

尽管我还没有解释分区的派生方式,但是我确实发现了如何对数据进行混洗(这是我最初的问题)。联接具有一些副作用:

混洗/分区:
Spark将哈希分区“RDD”键并在“Workers”之间移动/分配。给定键的每组值(例如5)将最终存储在单个“Worker” / JVM中。这意味着,如果您的“联接”具有1..N关系并且N严重偏斜,则最终将导致偏斜的分区和JVM堆(即,一个“Partition”可能具有Max(N)而另一个Min(N) )。避免这种情况的唯一方法是在可能的情况下使用“广播”或忍受这种行为。由于您的数据最初将平均分配,因此改组的数量将取决于密钥哈希。

重新分区:
在“倾斜”的连接之后,调用“重新分区”似乎可以在分区之间平均地重新分配数据。因此,如果有不可避免的偏斜问题,这是一件好事。请注意,尽管此转换将触发大量重排,但是后续操作将更快。不利的一面是无法控制的对象创建(请参见下文)

对象创建/堆污染:
您设法加入数据,认为重新分区是重新平衡集群的一个好主意,但是由于某种原因,“重新分区”会触发“OOME”。发生的情况是,最初加入的数据重新使用了加入的对象。当您触发“重新分区”或涉及改组的任何其他“操作”时,例如额外的联接或“groupBy”(后跟“Action”),数据将被序列化,因此您将丢失Object的重用。对象反序列化后,它们现在是新实例。还要注意的是,在序列化过程中,重复使用会丢失,因此后备箱会很重。因此,就我而言,一个1..1000000联接(其中1是我的“重载”对象)将在执行任何触发随机播放的操作后失败。

解决方法/调试:

  • 我使用'mapPartitionsWithIndex'来调试分区大小,方法是返回单个项目'Iterable>',其中包含每个分区的计数。这非常有用,因为您可以在执行“操作”后看到“重新分区”的效果和分区状态。
  • 您可以在联接RDD上使用'countByKeyApprox'或'countByKey'来了解基数,然后分两步应用联接。高基数密钥使用“广播”,低基数密钥使用“联接”。将这些操作包装在“rdd.cache()”和“rdd.unpersist()”块中将大大加快此过程。尽管这可能会使您的代码稍微复杂一些,但是它将提供更好的性能,尤其是在您进行后续操作时。还要注意,如果在每个“地图”中都使用“广播”进行查找,则还将显着减小混排大小。
  • 对影响分区数量的其他操作进行“分区”可能非常有用,但是请注意,(随机)大量分区会引起更多的误解,因为给定键的大型集会创建较大的分区,但是其他分区的大小较小或为0。创建调试方法以获取分区的大小将有助于您选择合适的大小。
  • 07-24 22:09