我的问题与PySpark reduceByKey on multiple values相似,但有一些关键的区别。我是PySpark的新手,所以我肯定会丢失一些明显的东西。

我有一个具有以下结构的RDD:

(K0, ((k01,v01), (k02,v02), ...))
....
(Kn, ((kn1,vn1), (kn2,vn2), ...))


我想要的输出就像

(K0, v01+v02+...)
...
(Kn, vn1+vn2+...)


这似乎是使用reduceByKey的完美案例,起初我想到的是

rdd.reduceByKey(lambda x,y: x[1]+y[1])


这给了我刚开始的RDD。我想我的索引有问题,因为有嵌套的元组,但是我尝试了我能想到的所有可能的索引组合,并且它一直在给我最初的RDD。

也许有原因为什么它不应该与嵌套元组一起工作还是我做错了什么?

最佳答案

您完全不应在此处使用reduceByKey。它具有带签名的关联和交换功能。 (T, T) => T。很明显,当您以List[Tuple[U, T]]作为输入并且希望以T作为输出时,它不适用。

由于尚不清楚是否使用键或唯一键,因此在我们必须本地和全局汇总时,请考虑一般示例。假设v01v02,... vm是简单数字:

from functools import reduce
from operator import add

def agg_(xs):
    # For numeric values sum would be more idiomatic
    # but lets make it more generic
    return reduce(add, (x[1] for x in xs), zero_value)

zero_value = 0
merge_op = add
def seq_op(acc, xs):
    return acc + agg_(xs)

rdd = sc.parallelize([
    ("K0", (("k01", 3), ("k02", 2))),
    ("K0", (("k03", 5), ("k04", 6))),
    ("K1", (("k11", 0), ("k12", -1)))])

rdd.aggregateByKey(0, seq_op, merge_op).take(2)
## [('K0', 16), ('K1', -1)]


如果键已经是唯一的,那么简单的mapValues就足够了:

from itertools import chain

unique_keys = rdd.groupByKey().mapValues(lambda x: tuple(chain(*x)))
unique_keys.mapValues(agg_).take(2)
## [('K0', 16), ('K1', -1)]

关于python - 嵌套元组上的Pyspark reduceByKey,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/39054489/

10-12 06:19