下午好!我有个问题:

val rdd1: RDD[(key, value)] = ...
val rdd2: RDD[(key, othervalue)] = ...


我想过滤rdd1并丢弃所有不在rdd2中的元素。我知道两种方法可以做到这一点。

第一:

val keySet = rdd2.map(_.key).distinct().collect().toSet
rdd1.filter(x => keySet.contains(x))


它不起作用,因为keySet很大并且不适合内存。

另一个:

rdd1.cogroup(rdd2)
  .filter(x => x._2._2.nonEmpty)
  .flatMap(x => x._2._1)


这里发生了一些事情,我得到两种错误(在不同的代码位置):java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUEjava.lang.ArrayIndexOutOfBoundException

我认为那是因为我的团队太大了。

那么我该如何解决呢?是否有解决此问题的常用方法?

最佳答案

您是否考虑过使用subtractByKey

类似于

rdd1.map(x => (x, x))
    .subtractByKey(rdd2)
    .map((k,v) => k)

08-18 08:14