仅出于学习目的,我尝试将字典设置为累加器中的全局变量,add 函数运行良好,但我运行代码并将字典放入 map 函数中,它始终返回空。
但是将列表设置为全局变量的类似代码
class DictParam(AccumulatorParam):
def zero(self, value = ""):
return dict()
def addInPlace(self, acc1, acc2):
acc1.update(acc2)
if __name__== "__main__":
sc, sqlContext = init_spark("generate_score_summary", 40)
rdd = sc.textFile('input')
#print(rdd.take(5))
dict1 = sc.accumulator({}, DictParam())
def file_read(line):
global dict1
ls = re.split(',', line)
dict1+={ls[0]:ls[1]}
return line
rdd = rdd.map(lambda x: file_read(x)).cache()
print(dict1)
最佳答案
我相信 print(dict1())
只是在 rdd.map()
之前执行。
在 Spark 中,有两种类型的 operations :
累加器仅在 some action is executed 时更新:
如果您查看文档本部分的末尾,则会有一个与您的示例完全相同的示例:
accum = sc.accumulator(0)
def g(x):
accum.add(x)
return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.
因此,您需要添加一些操作,例如:
rdd = rdd.map(lambda x: file_read(x)).cache() # transformation
foo = rdd.count() # action
print(dict1)
请务必检查各种 RDD 函数和累加器特性的详细信息,因为这可能会影响结果的正确性。 (例如,
rdd.take(n)
将默认为 only scan one partition ,而不是整个数据集。)关于dictionary - pyspark 中的累加器,以 dict 作为全局变量,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44640184/