问题描述
我想知道将 groupByKey 操作替换为另一个操作的最佳方法.
I would like to know best way to replace groupByKey operation with another.
基本上我想获得一个RDD[(int,List[Measure]),我的情况:
Basically I would like to obtain an RDD[(int,List[Measure]), my situation:
// consider measures like RDD of objects
measures.keyBy(_.getId)
.groupByKey
我的想法是使用 reduceByKey 代替,因为它会减少 shuffle:
My idea is to use reduceByKey instead, bacause it cause less shuffle:
measures.keyBy(_.getId)
.mapValues(List(_))
.reduceByKey(_++_)
但我认为效率非常低,因为它迫使我实例化大量不必要的 List 对象.
But I think is very inefficient cause it force me to instantiate a tons of unnecessary List objects.
谁能有其他想法来替换 groupByKey?
推荐答案
另一种方法是使用 aggregateByKey
,它专门用于将值组合成与原始值不同的类型:
Another way is using aggregateByKey
, which is specifically for combining values into a type different from the original values:
measures.keyBy(_.getId)
.aggregateByKey(List[Measure]())(_ :+ _, _ ++ _)
这会为每个分区中的每个键创建一个空列表,将所有值附加到每个分区中的这些列表中,然后最后打乱列表以连接每个键的所有值.
This creates an empty list for each key in each partition, appends all values to these in each partition, then finally shuffles the lists to concatenate all for each key.
在 Scala 中附加到列表是 O(n),最好预先添加,即 O(1),但看起来不太干净:
Appending to a list in Scala is O(n), it is better to prepend, which is O(1), but looks a bit less clean:
measures.keyBy(_.getId)
.aggregateByKey(List[Measure]())(_.+:(_), _ ++ _)
或:
measures.keyBy(_.getId)
.aggregateByKey(List[Measure]())((l, v) => v +: l, _ ++ _)
这可能比您的 reduceByKey
示例更有效,但是 reduceByKey
和 aggregateByKey
远优于 groupByKey
是您首先可以大幅减少数据大小的地方,然后只对更小的结果进行混洗.在这种情况下,您没有这种减少:中间列表包含您开始使用的所有数据,因此当每个分区列表组合时,您仍在使用完整数据集进行混洗(这对于使用 reduceByKey
).
This is probably more efficient than your reduceByKey
example, but the situations where reduceByKey
and aggregateByKey
are far superior over groupByKey
are where you can first make a large reduction in data size, and only shuffle the much smaller results around. In this case you don't have this reduction: the intermediate lists contain all the data you start out with, so you are still shuffling with your full data set when the per-partition lists are combined (this holds similarly for using reduceByKey
).
此外,正如 zero323 指出的,groupByKey
在这种情况下实际上更有效,因为它知道它正在构建所有数据的列表,并且可以专门为此执行优化:
Moreover, as zero323 pointed out, groupByKey
is actually more efficient in this case because it knows it is building lists of all the data and can perform optimisations specifically for that:
- 它禁用了地图端聚合,从而阻止使用所有数据构建大哈希地图
- 它使用了一个智能缓冲区 (
CompactBuffer
),与一一构建不可变列表相比,它显着减少了内存分配量.
- It disables map-side aggregation which prevents building a big hash map with all the data
- It uses a smart buffer (
CompactBuffer
), which reduces the amount of memory allocations significantly compared to building up immutable lists one by one.
groupByKey
和 reduceByKey
或 aggregateByKey
之间的差异可能很小的另一种情况是当键的数量不小于值的数量.
Another situation where the difference between groupByKey
and reduceByKey
or aggregateByKey
may be minimal is when the number of keys isn't much smaller than the number of values.
这篇关于在 apache Spark 中替换 groupByKey 的方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!