是否可以将额外的参数传递给pySpark中的映射函数?
具体来说,我有以下代码配方:

raw_data_rdd = sc.textFile("data.json", use_unicode=True)
json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line))
mapped_rdd = json_data_rdd.flatMap(processDataLine)

除了JSON对象外,函数processDataLine还接受其他参数,例如:
def processDataLine(dataline, arg1, arg2)

如何将额外的参数arg1arg2传递给flaMap函数?

最佳答案

  • 您可以直接在flatMap中使用匿名函数
    json_data_rdd.flatMap(lambda j: processDataLine(j, arg1, arg2))
    

    或 curry processDataLine
    f = lambda j: processDataLine(dataline, arg1, arg2)
    json_data_rdd.flatMap(f)
    
  • 您可以像这样生成processDataLine:
    def processDataLine(arg1, arg2):
        def _processDataLine(dataline):
            return ... # Do something with dataline, arg1, arg2
        return _processDataLine
    
    json_data_rdd.flatMap(processDataLine(arg1, arg2))
    
  • toolz 库提供了有用的curry装饰器:
    from toolz.functoolz import curry
    
    @curry
    def processDataLine(arg1, arg2, dataline):
        return ... # Do something with dataline, arg1, arg2
    
    json_data_rdd.flatMap(processDataLine(arg1, arg2))
    

    请注意,我已将dataline参数推到最后一个位置。这不是必需的,但是通过这种方式,我们不必使用关键字args。
  • 最后, functools.partial 在注释中已经提到了Avihoo Mamka
  • 10-02 09:13
    查看更多