当我需要在RDD中对数据进行分组时,我总是使用reduceByKey
,因为它在混洗数据之前执行了映射方面的减少,这通常意味着更少的数据被混洗,因此我可以获得更好的性能。即使地图端的reduce函数收集了所有值并且实际上并未减少数据量,我仍然使用reduceByKey
,因为我假设reduceByKey
的性能永远不会比groupByKey
差。但是,我想知道这个假设是否正确,或者是否确实存在应该优先使用groupByKey
的情况?
最佳答案
我相信climbage和eliasah会忽略该问题的其他方面:
代码可读性
代码可维护性
代码库大小
如果操作不能减少数据量,则必须采用一种与GroupByKey
语义等效的方式。假设我们有RDD[(Int,String)]
:
import scala.util.Random
Random.setSeed(1)
def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")
val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))
并且我们想连接给定键的所有字符串。使用
groupByKey
非常简单:rdd.groupByKey.mapValues(_.mkString(""))
reduceByKey
的天真解决方案如下所示:rdd.reduceByKey(_ + _)
它简短且可以说很容易理解,但存在两个问题:
效率极低,因为它每次都会创建一个新的
String
对象*建议您执行的操作比实际操作要便宜,尤其是在仅分析DAG或调试字符串的情况下
为了解决第一个问题,我们需要一个可变的数据结构:
import scala.collection.mutable.StringBuilder
rdd.combineByKey[StringBuilder](
(s: String) => new StringBuilder(s),
(sb: StringBuilder, s: String) => sb ++= s,
(sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)
它仍然暗示着其他事情确实正在发生并且非常冗长,特别是如果在脚本中重复多次。您当然可以提取匿名函数
val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) =>
sb1.append(sb2)
rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)
但归根结底,这仍然意味着您需要付出更多的努力来理解该代码,增加复杂性并且没有真正的附加值。我发现特别麻烦的一件事是明确包含可变数据结构。即使Spark处理几乎所有复杂性,这也意味着我们不再拥有优雅的,参照透明的代码。
我的意思是,如果您确实要减少数据量,请使用
reduceByKey
。否则,您将使代码更难编写,更难分析且一无所获。注意:
这个答案集中在Scala
RDD
API上。当前的Python实现与JVM的实现有很大不同,并且包括优化,在类似reduceByKey
的操作中,这些优化相对于朴素的groupBy
实现具有明显的优势。有关
Dataset
API,请参见DataFrame / Dataset groupBy behaviour/optimization。*有关令人信服的示例,请参见Spark performance for Scala vs Python
关于apache-spark - 是groupByKey优先于reduceByKey吗,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33221713/