我在 pyspark udf 函数中有问题,我想打印产生问题的行数。

我尝试使用 Python 中的“静态变量”等价物来计算行数,以便当用新行调用 udf 时,计数器会增加。但是,它不起作用:

import pyspark.sql.functions as F
def myF(input):
    myF.lineNumber += 1
    if (somethingBad):
        print(myF.lineNumber)
    return res

myF.lineNumber = 0

myF_udf =  F.udf(myF, StringType())

如何计算调用 udf 的次数,以便找到在 pyspark 中生成问题的行数?

最佳答案

udfs 在工作人员处执行,因此其中的打印语句不会显示在输出中(来自驱动程序)。处理 UDF 问题的最佳方法是将 UDF 的返回类型更改为结构或列表,并将错误信息与返回的输出一起传递。在下面的代码中,我只是将错误信息添加到您最初返回的字符串 res 中。

import pyspark.sql.functions as F
def myF(input):
  myF.lineNumber += 1
  if (somethingBad):
    res += 'Error in line {}".format(myF.lineNumber)
  return res

myF.lineNumber = 0

myF_udf =  F.udf(myF, StringType())

关于python - pyspark udf 打印正在分析的行,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/54252682/

10-12 19:35