本文介绍了Apache Spark:将两个具有不同分区器的RDD连接起来的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有2个具有不同分区器的rdds.

I have 2 rdds with different set of partitioners.

case class Person(name: String, age: Int, school: String)
case class School(name: String, address: String)

rdd1Person的RDD,我已根据人员的age对其进行了分区,然后将密钥转换为school.

rdd1 is the RDD of Person, which I have partitioned based on age of the person, and then converted the key to school.

val rdd1: RDD[Person] = rdd1.keyBy(person => (person.age, person))
                            .partitionBy(new HashPartitioner(10))
                            .mapPartitions(persons => 
                                 persons.map{case(age,person) => 
                                    (person.school, person)
                            })

rdd2是根据学校name分组的School的RDD.

rdd2 is the RDD of School grouped by name of the school.

val rdd2: RDD[School] = rdd2.groupBy(_.name)

现在,rdd1根据人的年龄进行了划分,因此所有具有相同年龄的人都将进入相同的划分.并且,rdd2根据学校名称进行分区(默认情况下).

Now, rdd1 is partitioned based on age of the person, so all persons with same age goes to same partitions. And, rdd2 is partitioned(by default) based on the name of the school.

我想以这样的方式rdd1.leftOuterJoin(rdd2)不会打乱rdd1,因为rdd1与rdd2相比非常大.另外,我将结果输出到在age上进行了分区的Cassandra,因此rdd1的当前分区将加快以后的写入过程.

I want to rdd1.leftOuterJoin(rdd2) in such a way that rdd1 doesn't get shuffled because rdd1 is very very big compared to rdd2. Also, I'm outputting the result to Cassandra which is partitioned on age, so current partitioning of rdd1 will fasten the process of writing later.

是否有一种方法可以将两个RDD加入其中而无需:1.改组rdd1和2.广播"rdd2",因为rdd2大于可用内存.

Is there a way to join there two RDDs without:1. Shuffling rdd1 and 2. Broadcasting 'rdd2', because rdd2 is bigger than the available memory.

注意:加入的rdd应该根据年龄进行分区.

Note: The joined rdd should be partitioned based on age.

推荐答案

假设您有两个rdds,rdd1和rdd2,并且要应用联接操作.以及rdds是否已分区(已设置分区).然后调用rdd3 = rdd1.join(rdd2)将使rdd3被rdd1分区. rdd3将始终从rdd1(第一个父级,即连接的父级)中获取哈希分区.

Suppose you have two rdds, rdd1 and rdd2 and want to apply join operation. and if rdds has partitioned (partition is set). then calling rdd3 = rdd1.join(rdd2) will make rdd3 partition by rdd1. rdd3 will always take hash partition from rdd1 (first parent, the one that join was called on).

这篇关于Apache Spark:将两个具有不同分区器的RDD连接起来的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-28 07:59