仅出于学习目的,我尝试将字典设置为累加器中的全局变量,add 函数运行良好,但我运行代码并将字典放入 map 函数中,它始终返回空。

但是将列表设置为全局变量的类似代码

class DictParam(AccumulatorParam):
    def zero(self,  value = ""):
        return dict()

    def addInPlace(self, acc1, acc2):
        acc1.update(acc2)


if  __name__== "__main__":
    sc, sqlContext = init_spark("generate_score_summary", 40)
    rdd = sc.textFile('input')
    #print(rdd.take(5))



    dict1 = sc.accumulator({}, DictParam())


    def file_read(line):
        global dict1
        ls = re.split(',', line)
        dict1+={ls[0]:ls[1]}
        return line


    rdd = rdd.map(lambda x: file_read(x)).cache()
    print(dict1)

最佳答案

我相信 print(dict1()) 只是在 rdd.map() 之前执行。

在 Spark 中,有两种类型的 operations :

  • 转换,描述 future 的计算
  • 和 Action ,即调用 Action ,并实际触发执行

  • 累加器仅在 some action is executed 时更新:



    如果您查看文档本部分的末尾,则会有一个与您的示例完全相同的示例:
    accum = sc.accumulator(0)
    def g(x):
        accum.add(x)
        return f(x)
    data.map(g)
    # Here, accum is still 0 because no actions have caused the `map` to be computed.
    

    因此,您需要添加一些操作,例如:
    rdd = rdd.map(lambda x: file_read(x)).cache() # transformation
    foo = rdd.count() # action
    print(dict1)
    

    请务必检查各种 RDD 函数和累加器特性的详细信息,因为这可能会影响结果的正确性。 (例如, rdd.take(n) 将默认为 only scan one partition ,而不是整个数据集。)

    关于dictionary - pyspark 中的累加器,以 dict 作为全局变量,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44640184/

    10-11 07:38