当我需要在RDD中对数据进行分组时,我总是使用reduceByKey,因为它在混洗数据之前执行了映射方面的减少,这通常意味着更少的数据被混洗,因此我可以获得更好的性能。即使地图端的reduce函数收集了所有值并且实际上并未减少数据量,我仍然使用reduceByKey,因为我假设reduceByKey的性能永远不会比groupByKey差。但是,我想知道这个假设是否正确,或者是否确实存在应该优先使用groupByKey的情况?

最佳答案

我相信climbageeliasah会忽略该问题的其他方面:


代码可读性
代码可维护性
代码库大小


如果操作不能减少数据量,则必须采用一种与GroupByKey语义等效的方式。假设我们有RDD[(Int,String)]

import scala.util.Random
Random.setSeed(1)

def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")

val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))


并且我们想连接给定键的所有字符串。使用groupByKey非常简单:

rdd.groupByKey.mapValues(_.mkString(""))


reduceByKey的天真解决方案如下所示:

rdd.reduceByKey(_ + _)


它简短且可以说很容易理解,但存在两个问题:


效率极低,因为它每次都会创建一个新的String对象*
建议您执行的操作比实际操作要便宜,尤其是在仅分析DAG或调试字符串的情况下


为了解决第一个问题,我们需要一个可变的数据结构:

import scala.collection.mutable.StringBuilder

rdd.combineByKey[StringBuilder](
    (s: String) => new StringBuilder(s),
    (sb: StringBuilder, s: String) => sb ++= s,
    (sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)


它仍然暗示着其他事情确实正在发生并且非常冗长,特别是如果在脚本中重复多次。您当然可以提取匿名函数

val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) =>
  sb1.append(sb2)

rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)


但归根结底,这仍然意味着您需要付出更多的努力来理解该代码,增加复杂性并且没有真正的附加值。我发现特别麻烦的一件事是明确包含可变数据结构。即使Spark处理几乎所有复杂性,这也意味着我们不再拥有优雅的,参照透明的代码。

我的意思是,如果您确实要减少数据量,请使用reduceByKey。否则,您将使代码更难编写,更难分析且一无所获。

注意:

这个答案集中在Scala RDD API上。当前的Python实现与JVM的实现有很大不同,并且包括优化,在类似reduceByKey的操作中,这些优化相对于朴素的groupBy实现具有明显的优势。

有关Dataset API,请参见DataFrame / Dataset groupBy behaviour/optimization



*有关令人信服的示例,请参见Spark performance for Scala vs Python

关于apache-spark - 是groupByKey优先于reduceByKey吗,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33221713/

10-11 07:43