我正在编写一个大型PySpark程序,最近在RDD上使用reduceByKey时遇到了麻烦。我已经能够通过一个简单的测试程序来重现该问题。代码是:

from pyspark import SparkConf, SparkContext

APP_NAME = 'Test App'

def main(sc):
    test = [(0, [i]) for i in xrange(100)]
    test = sc.parallelize(test)
    test = test.reduceByKey(method)
    print test.collect()

def method(x, y):
    x.append(y[0])
    return x

if __name__ == '__main__':
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster('local[*]')
    sc = SparkContext(conf=conf)

    main(sc)


我希望基于Spark文档的输出为(0, [0,1,2,3,4,...,98,99])。相反,我得到以下输出:

[(0, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 24, 36, 48, 60, 72, 84])]


有人可以帮我理解为什么生成此输出吗?

附带一提,当我使用

def method(x, y):
    x = x + y
    return x


我得到了预期的输出。

最佳答案

首先,看起来您实际上想要groupByKey而不是reduceByKey

rdd = sc.parallelize([(0, i) for i in xrange(100)])
grouped = rdd.groupByKey()
k, vs = grouped.first()
assert len(list(vs)) == 100



  有人可以帮我理解为什么生成此输出吗?


reduceByKey assumes表示fassociative,而您的method显然不是。根据操作顺序,输出是不同的。假设您从以下某个键的数据开始:

[1], [2], [3], [4]


现在添加让我们添加一些括号:


((([1], [2]), [3]), [4])
(([1, 2], [3]), [4])
([1, 2, 3], [4])
[1, 2, 3, 4]


并加上另一组括号


(([1], ([2], [3])), [4])
(([1], [2, 3]), [4])
([1, 2], [4])
[1, 2, 4]


重写时,如下所示:

method = lambda x, y: x + y


或简单地

from operator import add
method = add


您将获得一个关联函数,它可以按预期工作。

一般来说,对于reduce*操作,您需要既具有关联性又具有commutative的功能。

关于python - PySpark的reduceByKey无法按预期工作,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33059652/

10-11 06:29