我有RDD类型的4rdds:((int,int,int),值),我的rdds是

rdd1: ((a,b,c), value)
rdd2:((a,d,e),valueA)
rdd3:((f,b,g),valueB)
rdd4:((h,i,c),valueC)


我怎样才能像rdd1一样在rdd1上加入rdd2 rdd1在“ b”上加入rdd2而rdd1在“ c”上加入rdd3

所以输出是Scala中的finalRdd: ((a,b,c),valueA,valueB,valueC,value))

我尝试使用collectAsMap进行此操作,但效果不佳并引发异常

仅用于rdd1的代码加入rdd2

val newrdd2=rdd2.map{case( (a,b,c),d)=>(a,d)}.collectAsMap
val joined=rdd1.map{case( (a,b,c),d)=>(newrdd2.get(a).get,b,c,d)}




rdd1: ((1,2,3),animals)
rdd2:((1,anyInt,anyInt),cat)
rdd3:((anyInt,2,anyInt),cow )
rdd 4: ((anyInt,anyInt,3),parrot)


输出应为((1,2,3),animals,cat,cow,parrot )

最佳答案

在RDD上有一个方便的join方法,但是您需要使用特定的连接密钥对其进行键控,这是Spark用于分区和改组的方式。

the docs


join(otherDataset,[numTasks]):在(K,V)和(K,W)类型的数据集上调用时,返回(K,(V,W))对的数据集,其中每个键都有所有成对的元素。通过leftOuterJoin,rightOuterJoin和fullOuterJoin支持外部联接。


我无法编译自己所在的位置,但是通过手工它会变成这样:

val rdd1KeyA = rdd1.map(x => (x._1._1, (x._1._2, x._1._3. x._2) // RDD(a, (b,c,value))
val rdd2KeyA = rdd2.map(x => (x._1._1, x._2) // RDD(a, valueA)
val joined1 = rdd1KeyA.join(rdd2KeyA) // RDD(a, ((b,c,value), valueA))

val rdd3KeyB = rdd3.map(x => (x._1._2, x._2) // RDD(b, valueB)
val joined1KeyB = joined1.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._1._3. x._2._2) // RDD(b, (a, c, value, valueA))
val joined2 = joined1KeyB.join(rdd3keyB) // RDD(b, ((a, c, value, valueA), valueB))


...等等

避免使用collect*函数,因为它们不使用数据的分布式特性并且容易在大负载下失败,它们会将RDD上的所有数据混洗到主节点上的内存中集合中,这可能会使所有数据消耗blowing尽。

10-06 07:11