我有两个rdd,我想为rdd1的每个项目对RDD2项目进行一些计算。因此,我在如下所示的用户定义函数中传递了RDD2,但出现了类似rdd1 cannot be passed in another rdd
的错误。如果要对两个rdd执行操作,是否可以知道如何实现?
例如:RDD1.map(line =>function(line,RDD2))
最佳答案
如错误所示,Spark不支持嵌套RDD。通常,您必须通过重新设计算法来解决它。
如何执行取决于实际用例,function
中到底发生了什么以及输出了什么。
有时RDD1.cartesian(RDD2)
,每个元组执行操作,然后按键减少将起作用。有时,如果您使用(K,V)
类型,则两个RDD之间的联接都会起作用。
如果RDD2很小,您可以随时将其收集在驱动程序中,使其成为广播变量,并在function
而不是RDD2
中使用该变量。
@编辑:
例如,让我们假设您的RDD包含字符串,并且function
将计算RDD
中RDD2
中给定记录发生的次数:
def function(line: String, rdd: RDD[String]): (String, Int) = {
(line, rdd.filter(_ == line).count)
}
这将返回
RDD[(String, Int)]
。Idea1
您可以尝试使用RDD的
cartesian
方法使用cartesian product。val cartesianProduct = RDD1.cartesian(RDD2) // creates RDD[(String, String)]
.map( (r1,r2) => (r1, function2) ) // creates RDD[(String, Int)]
.reduceByKey( (c1,c2) => c1 + c2 ) // final RDD[(String, Int)]
这里
function2
接受r1
和r2
(它们是字符串),如果相等则返回1
,否则返回0
。最终的映射将生成一个RDD
,它具有元组,其中键将是r1
中的记录,而值将是总计数。问题1:但是,如果
RDD1
中有重复的字符串,则此方法将无效。您必须考虑一下。如果RDD1
记录具有一些唯一的ID,那将是完美的。问题2:这确实会创建很多对(对于两个RDD中的100万条记录,它将创建大约500亿对),速度很慢,并且很可能会导致生成许多shuffling。
Idea2
我不理解您对RDD2大小
lacs
的评论,因此这可能会或可能不会起作用:val rdd2array = sc.broadcast(RDD2.collect())
val result = RDD1.map(line => function(line, rdd2array))
问题:这可能会耗尽您的内存。在
collect()
上调用driver
,并且来自all
的rdd2
记录将被加载到驱动程序节点上的内存中。Idea3
根据用例,还有其他方法可以解决此问题,例如brute force algorithm for Similarity Search与您的用例相似(双关语意)。替代解决方案之一是Locality Sensitive Hashing。
关于scala - 如何通过.map在另一个RDD中传递一个RDD,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/34823732/