我正在加入两个RDD rddA
和rddB
。rddA
具有100个分区,而rddB
具有500个分区。
我试图了解join
操作的机制。默认情况下,无论连接的顺序如何,我最终都具有相同的分区结构;即rddA.join(rddB
)和rddB.join(rddA)
产生相同数量的分区,通过观察它使用较小的分区大小100。我知道我可以通过使用rddA.join(rddB,500)
来增加分区大小,但是我对引擎盖下发生的事情更感兴趣以及为什么选择较小的尺寸。从观察来看,即使我重新分割了小的rdd
,它的分割仍然会被使用; Spark是否对密钥大小进行启发式分析?
我遇到的另一个问题是偏斜度。我的较小分区最终有3,314个条目,而较大分区最终有1,139,207个,总大小为599,911,729(键)。两个RDD都使用默认的分区程序,那么如何确定数据混洗?
我隐约记得阅读过的内容,如果一个rdd
设置了一个分区程序,那么将使用该分区程序。是这样吗是否“建议”这样做?
最后,请注意,我的两个rdd
都相对较大(〜90GB),因此广播联接将无济于事。相反,任何为join
操作提供一些见解的方法都可能是可行的方法。
PS。关于左右连接机制的任何详细信息将是额外的好处:)
最佳答案
尽管我还没有解释分区的派生方式,但是我确实发现了如何对数据进行混洗(这是我最初的问题)。联接具有一些副作用:
混洗/分区:
Spark将哈希分区“RDD”键并在“Workers”之间移动/分配。给定键的每组值(例如5)将最终存储在单个“Worker” / JVM中。这意味着,如果您的“联接”具有1..N关系并且N严重偏斜,则最终将导致偏斜的分区和JVM堆(即,一个“Partition”可能具有Max(N)而另一个Min(N) )。避免这种情况的唯一方法是在可能的情况下使用“广播”或忍受这种行为。由于您的数据最初将平均分配,因此改组的数量将取决于密钥哈希。
重新分区:
在“倾斜”的连接之后,调用“重新分区”似乎可以在分区之间平均地重新分配数据。因此,如果有不可避免的偏斜问题,这是一件好事。请注意,尽管此转换将触发大量重排,但是后续操作将更快。不利的一面是无法控制的对象创建(请参见下文)
对象创建/堆污染:
您设法加入数据,认为重新分区是重新平衡集群的一个好主意,但是由于某种原因,“重新分区”会触发“OOME”。发生的情况是,最初加入的数据重新使用了加入的对象。当您触发“重新分区”或涉及改组的任何其他“操作”时,例如额外的联接或“groupBy”(后跟“Action”),数据将被序列化,因此您将丢失Object的重用。对象反序列化后,它们现在是新实例。还要注意的是,在序列化过程中,重复使用会丢失,因此后备箱会很重。因此,就我而言,一个1..1000000联接(其中1是我的“重载”对象)将在执行任何触发随机播放的操作后失败。
解决方法/调试: