我正在从Jupyter控制台执行一些简单的PySpark,当我尝试调用一些外部代码时遇到了问题。我的最小示例代码具有一个依赖性testpackage.zip。当我调用使用该包中的代码的UserDefinedFunction时,得到一个AttributeError,它显示Spark在testpackage.zip中找不到该函数。

testpackage是一个目录,其中包含一个空的__init__.pytestmod.py,其中包含



def testfunc(x):
    return float(x)+1.33


最少的示例代码,我在Jupyter Console中一次运行一个代码块:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction as udf
from pyspark.sql.types import DoubleType

sess = SparkSession.builder.appName("testing").getOrCreate()
sc = sess.sparkContext

DEP_PATH = < path on driver >.testpackage.zip
sc.addPyFile(DEP_PATH)
import testpackage

df = sess.range(0, 12*1000*1000, numPartitions=12)

test_udf = udf(lambda x: testpackage.testmod.testfunc(x), DoubleType())

df = df.withColumn("udf_res", test_udf(df.id))

df.head(5) # error


请注意(依赖项所在的目录)在我的PYTHONPATH中。

错误消息部分读取:


  “ AttributeError:模块'testpackage'没有属性'testmod'”。


我不确定该问题是否与我在udf中调用testfunc的事实有关,或者是否只是没有正确添加依赖项。以编程方式(不使用spark-submit)添加Python依赖项的最佳方法是什么?

最佳答案

您需要通过以下方式导入模块。您能不能尝试以下-

从testpackage.testmod import *

test_udf = udf(lambda x: testfunc(x), DoubleType())

08-25 09:00