我有两个rdd,我想为rdd1的每个项目对RDD2项目进行一些计算。因此,我在如下所示的用户定义函数中传递了RDD2,但出现了类似rdd1 cannot be passed in another rdd的错误。如果要对两个rdd执行操作,是否可以知道如何实现?

例如:
RDD1.map(line =>function(line,RDD2))

最佳答案

如错误所示,Spark不支持嵌套RDD。通常,您必须通过重新设计算法来解决它。

如何执行取决于实际用例,function中到底发生了什么以及输出了什么。

有时RDD1.cartesian(RDD2),每个元组执行操作,然后按键减少将起作用。有时,如果您使用(K,V)类型,则两个RDD之间的联接都会起作用。

如果RDD2很小,您可以随时将其收集在驱动程序中,使其成为广播变量,并在function而不是RDD2中使用该变量。

@编辑:

例如,让我们假设您的RDD包含字符串,并且function将计算RDDRDD2中给定记录发生的次数:

def function(line: String, rdd: RDD[String]): (String, Int) = {
   (line, rdd.filter(_ == line).count)
}

这将返回RDD[(String, Int)]

Idea1

您可以尝试使用RDD的cartesian方法使用cartesian product
val cartesianProduct = RDD1.cartesian(RDD2) // creates RDD[(String, String)]
                           .map( (r1,r2) => (r1, function2) ) // creates RDD[(String, Int)]
                           .reduceByKey( (c1,c2) => c1 + c2 ) // final RDD[(String, Int)]

这里function2接受r1r2(它们是字符串),如果相等则返回1,否则返回0。最终的映射将生成一个RDD,它具有元组,其中键将是r1中的记录,而值将是总计数。

问题1:但是,如果RDD1中有重复的字符串,则此方法将无效。您必须考虑一下。如果RDD1记录具有一些唯一的ID,那将是完美的。

问题2:这确实会创建很多对(对于两个RDD中的100万条记录,它将创建大约500亿对),速度很慢,并且很可能会导致生成许多shuffling

Idea2

我不理解您对RDD2大小lacs的评论,因此这可能会或可能不会起作用:
val rdd2array = sc.broadcast(RDD2.collect())
val result = RDD1.map(line => function(line, rdd2array))

问题:这可能会耗尽您的内存。在collect()上调用driver,并且来自allrdd2记录将被加载到驱动程序节点上的内存中。

Idea3

根据用例,还有其他方法可以解决此问题,例如brute force algorithm for Similarity Search与您的用例相似(双关语意)。替代解决方案之一是Locality Sensitive Hashing

关于scala - 如何通过.map在另一个RDD中传递一个RDD,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/34823732/

10-12 18:25