下午好!我有个问题:
val rdd1: RDD[(key, value)] = ...
val rdd2: RDD[(key, othervalue)] = ...
我想过滤
rdd1
并丢弃所有不在rdd2
中的元素。我知道两种方法可以做到这一点。第一:
val keySet = rdd2.map(_.key).distinct().collect().toSet
rdd1.filter(x => keySet.contains(x))
它不起作用,因为
keySet
很大并且不适合内存。另一个:
rdd1.cogroup(rdd2)
.filter(x => x._2._2.nonEmpty)
.flatMap(x => x._2._1)
这里发生了一些事情,我得到两种错误(在不同的代码位置):
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
和java.lang.ArrayIndexOutOfBoundException
我认为那是因为我的团队太大了。
那么我该如何解决呢?是否有解决此问题的常用方法?
最佳答案
您是否考虑过使用subtractByKey
?
类似于
rdd1.map(x => (x, x))
.subtractByKey(rdd2)
.map((k,v) => k)