我试图了解combineByKeys
中的每个步骤如何工作。
有人可以帮我了解下面的RDD吗?
val rdd = sc.parallelize(List(
("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),("B", 4),
("B", 10), ("B", 11), ("B", 20), ("B", 25),("C", 32), ("C", 91),
("C", 122), ("C", 3), ("C", 55)), 2)
rdd.combineByKey(
(x:Int) => (x, 1),
(acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
最佳答案
首先,让我们分解一下过程:
首先,createCombiner
为键在分区上的首次匹配创建初始值(合并器)(如果未找到)-->
(firstValueEncountered, 1)
。因此,这仅仅是用第一个值和键计数器为1初始化元组。
然后,仅当已经为该分区上的找到的键mergeValue
-->
创建了组合器(在我们的例子中为元组)时,才会触发(existingTuple._1 + subSequentValue, existingTuple._2 + 1)
。这将现有元组的值(在第一个插槽中)与新遇到的值相加,并获取现有元组的计数器(在第二个插槽中)并对其进行递增。所以,我们是
然后,mergeCombiner
接收在每个分区上创建的组合器(元组),并将它们合并在一起-->
(tupleFromPartition._1 + tupleFromPartition2._1, tupleFromPartition1._2 + tupleFromPartition2._2)
。这只是将来自每个元组的值加在一起,并将计数器加在一起成为一个元组。
然后,让我们将数据的子集分解为多个分区,然后观察它们的作用:
("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)
分区1
A=3 --> createCombiner(3) ==> accum[A] = (3, 1)
A=9 --> mergeValue(accum[A], 9) ==> accum[A] = (3 + 9, 1 + 1)
B=11 --> createCombiner(11) ==> accum[B] = (11, 1)
分区2
A=12 --> createCombiner(12) ==> accum[A] = (12, 1)
B=4 --> createCombiner(4) ==> accum[B] = (4, 1)
B=10 --> mergeValue(accum[B], 10) ==> accum[B] = (4 + 10, 1 + 1)
合并分区
A ==> mergeCombiner((12, 2), (12, 1)) ==> (12 + 12, 2 + 1)
B ==> mergeCombiner((11, 1), (14, 2)) ==> (11 + 14, 1 + 2)
因此,您应该返回类似以下的数组:
Array((A, (24, 3)), (B, (25, 3)))
关于apache-spark - createCombiner,mergeValue,mergeCombiner如何在Spark的CombineByKey中工作(使用Scala),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/29246756/