本文介绍了连接两个RDD [String] -Spark Scala的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个RDDS:

rdd1 [String,String,String]: Name, Address, Zipcode
rdd2 [String,String,String]: Name, Address, Landmark

我正在尝试使用以下功能加入这两个RDD:rdd1.join(rdd2)
但是我遇到了一个错误:
error: value fullOuterJoin is not a member of org.apache.spark.rdd.RDD[String]

I am trying to join these 2 RDDs using the function : rdd1.join(rdd2)
But I am getting an error :
error: value fullOuterJoin is not a member of org.apache.spark.rdd.RDD[String]

联接应联接RDD [String],输出RDD应该类似于:

The join should join the RDD[String] and the output RDD should be something like :

rddOutput : Name,Address,Zipcode,Landmark

最后,我想将这些文件另存为JSON文件.

And I wanted to save these files as a JSON file in the end.

有人可以帮我吗?

推荐答案

如评论中所述,您必须在加入RDD之前将其RDD转换为PairRDD,这意味着每个RDD的类型必须为RDD[(key, value)].只有这样,您才能通过键执行联接.在您的情况下,密钥由(名称,地址)组成,因此您将必须执行以下操作:

As said in the comments, you have to convert your RDDs to PairRDDs before joining, which means that each RDD must be of type RDD[(key, value)]. Only then you can perform the join by the key. In your case, the key is composed by (Name, Address), so you you would have to do something like:

// First, we create the first PairRDD, with (name, address) as key and zipcode as value:
val pairRDD1 = rdd1.map { case (name, address, zipcode) => ((name, address), zipcode) }
// Then, we create the second PairRDD, with (name, address) as key and landmark as value:
val pairRDD2 = rdd2.map { case (name, address, landmark) => ((name, address), landmark) }

// Now we can join them.
// The result will be an RDD of ((name, address), (zipcode, landmark)), so we can map to the desired format:
val joined = pairRDD1.fullOuterJoin(pairRDD2).map {
  case ((name, address), (zipcode, landmark)) => (name, address, zipcode, landmark)
}

有关Spark的 Scala API文档

这篇关于连接两个RDD [String] -Spark Scala的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-29 14:39