我可以使用以下代码在两个RDD中打印数据。

usersRDD.foreach(println)
empRDD.foreach(println)

我需要比较两个RDD中的数据。如何在一个RDD中将字段数据与另一个RDD中的字段数据进行迭代和比较。例如:对记录进行迭代,并检查userRDD中的名称和年龄是否与empRDD中的记录匹配,如果没有放在单独的RDD中。

我尝试了userRDD.substract(empRDD),但它正在比较所有字段。

最佳答案

您需要在每个RDD中键入数据,以便有一些东西可以连接记录。例如,看看groupBy。然后,您对生成的RDD进行join。对于每个键,您都会在两个键中获得匹配的值。如果您有兴趣查找不匹配的 key ,请使用leftOuterJoin,如下所示:

// Returns the entries in userRDD that have no corresponding key in empRDD.
def nonEmp(userRDD: RDD[(String, String)], empRDD: RDD[(String, String)]) = {
  userRDD.leftOuterJoin(empRDD).collect {
    case (name, (age, None)) => name -> age
  }
}

09-11 21:27