我正在学习spark,但是我无法理解此功能combineByKey

>>> data = sc.parallelize([("A",1),("A",2),("B",1),("B",2),("C",1)] )
>>> data.combineByKey(lambda v : str(v)+"_", lambda c, v : c+"@"+str(v), lambda c1, c2 : c1+c2).collect()

输出为:
[('A', '1_2_'), ('C', '1_'), ('B', '1_2_')]

首先,我很困惑:第二步@中的lambda c, v : c+"@"+v在哪里?我从结果中找不到@

其次,我阅读了combineByKey的功能描述,但我对算法流程感到困惑。

最佳答案

groupByKey调用不会尝试合并/合并值,因此这是一项昂贵的操作。

因此,combineByKey调用就是这样的优化。使用combineByKey时,在每个分区上将值合并为一个值,然后将每个分区值合并为一个值。值得注意的是,合并值的类型不必与原始值的类型相匹配,而且通常不需要。 combineByKey函数采用3个函数作为参数:

  • 创建组合器的函数。在aggregateByKey函数中,第一个参数只是一个初始零值。在combineByKey中,我们提供了一个函数,该函数将接受当前值作为参数并返回将与其他值合并的新值。
  • 第二个函数是一个合并函数,它采用一个值并将其合并/合并为以前收集的值。
  • 第三个函数将合并的值组合在一起。基本上,此函数采用在分区级别生成的新值并将其组合,直到最终得到一个奇异值。

  • 换句话说,要了解combineByKey,考虑一下它如何处理所处理的每个元素很有用。当combineByKey遍历分区中的元素时,每个元素要么具有一个之前未曾看到的键,要么具有与先前元素相同的键。

    如果是新元素,combineByKey使用我们提供的函数createCombiner()来为该键上的累加器创建初始值。重要的是要注意,这种情况是在每个分区中第一次找到 key 时发生的,而不是在RDD中第一次发现 key 时发生。

    如果它是我们在处理该分区时以前见过的值,它将使用提供的函数mergeValue(),该键的累加器的当前值和新值。

    由于每个分区都是独立处理的,因此对于同一个 key ,我们可以有多个累加器。当我们合并每个分区的结果时,如果两个或多个分区具有相同 key 的累加器,我们将使用用户提供的mergeCombiners()函数合并累加器。

    引用:
  • 学习Spark-Chapter 4
  • Using combineByKey in Apache-Spark博客条目。
  • 关于python - 谁可以在Spark中为 `combineByKey`给出清晰的解释?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33937625/

    10-10 17:58