本文介绍了如何使用Spark Scala加入3个RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用spark rdd加入3 tables.我使用spark sql实现了我的目标,但是当我尝试使用Rdd加入它时,没有得到期望的结果.以下是我使用spark SQLoutput的查询:

I want to join 3 tables using spark rdd. I achieved my objective using spark sql but when I tried to join it using Rdd I am not getting the desired results. Below is my query using spark SQL and the output:

scala> actorDF.as("df1").join(movieCastDF.as("df2"),$"df1.act_id"===$"df2.act_id").join(movieDF.as("df3"),$"df2.mov_id"===$"df3.mov_id").
filter(col("df3.mov_title")==="Annie Hall").select($"df1.act_fname",$"df1.act_lname",$"df2.role").show(false)
+---------+---------+-----------+
|act_fname|act_lname|role       |
+---------+---------+-----------+
|Woody    |Allen    |Alvy Singer|
+---------+---------+-----------+

现在,我为三个数据集创建了pairedRDDs,如下所示:

Now I created the pairedRDDs for three datasets and it is as below :

scala> val actPairedRdd=actRdd.map(_.split("\t",-1)).map(p=>(p(0),(p(1),p(2),p(3))))

scala> actPairedRdd.take(5).foreach(println)

(101,(James,Stewart,M))
(102,(Deborah,Kerr,F))
(103,(Peter,OToole,M))
(104,(Robert,De Niro,M))
(105,(F. Murray,Abraham,M))

scala> val movieCastPairedRdd=movieCastRdd.map(_.split("\t",-1)).map(p=>(p(0),(p(1),p(2))))
movieCastPairedRdd: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[318] at map at <console>:29

scala> movieCastPairedRdd.foreach(println)
(101,(901,John Scottie Ferguson))
(102,(902,Miss Giddens))
(103,(903,T.E. Lawrence))
(104,(904,Michael))
(105,(905,Antonio Salieri))
(106,(906,Rick Deckard))


scala> val moviePairedRdd=movieRdd.map(_.split("\t",-1)).map(p=>(p(0),(p(1),p(2),p(3),p(4),p(5),p(6))))
moviePairedRdd: org.apache.spark.rdd.RDD[(String, (String, String, String, String, String, String))] = MapPartitionsRDD[322] at map at <console>:29

scala> moviePairedRdd.take(2).foreach(println)
(901,(Vertigo,1958,128,English,1958-08-24,UK))
(902,(The Innocents,1961,100,English,1962-02-19,SW))

此处actPairedRddmovieCastPairedRdd相互链接,并且movieCastPairedRddmoviePairedRdd链接,因为它们具有相同的列.
现在,当我加入所有三个数据集时,我没有任何数据

Here actPairedRdd and movieCastPairedRdd is linked with each other and movieCastPairedRdd and moviePairedRdd is linked since they have common column.
Now when I join all the three datasets I am not getting any data

scala> actPairedRdd.join(movieCastPairedRdd).join(moviePairedRdd).take(2).foreach(println)

我正在获得空白记录.所以我要去哪里错了?预先感谢

I am getting blank records. So where am I going wrong ?? Thanks in advance

推荐答案

像这样的带有RDD的联接很痛苦,这也是DF更好的另一个原因.

JOINs like this with RDDs are painful, that's another reason why DFs are nicer.

由于对RDD = K,您没有任何数据,对于最后一个RDD的K部分,V没有公共数据.带有101、102的K将加入,但与901、902没有共同点.您需要转移一些东西,像这样,我的例子比较有限:

You get no data as the pair RDD = K, V has no common data for the K part of the last RDD. The K's with 101, 102 will join, but there is no commonality with the 901, 902. You need to shift things around, like this, my more limited example:

val rdd1 = sc.parallelize(Seq(
           (101,("James","Stewart","M")),
           (102,("Deborah","Kerr","F")),
           (103,("Peter","OToole","M")),
           (104,("Robert","De Niro","M"))
           ))

val rdd2 = sc.parallelize(Seq(
           (101,(901,"John Scottie Ferguson")),
           (102,(902,"Miss Giddens")),
           (103,(903,"T.E. Lawrence")),
           (104,(904,"Michael"))
           ))

val rdd3 = sc.parallelize(Seq(
          (901,("Vertigo",1958 )),
          (902,("The Innocents",1961))
          ))

val rdd4 = rdd1.join(rdd2)

val new_rdd4 = rdd4.keyBy(x => x._2._2._1)  // Redefine Key for join with rdd3
val rdd5 = rdd3.join(new_rdd4)
rdd5.collect

返回:

res14: Array[(Int, ((String, Int), (Int, ((String, String, String), (Int, String)))))] = Array((901,((Vertigo,1958),(101,((James,Stewart,M),(901,John Scottie Ferguson))))), (902,((The Innocents,1961),(102,((Deborah,Kerr,F),(902,Miss Giddens))))))

您将需要通过地图剥离数据,我留给您.默认情况下为INNER加入.

You will need to strip out the data via a map, I leave that to you. INNER join per default.

这篇关于如何使用Spark Scala加入3个RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-23 15:36