假设您有一个文件,我们称它为udfs.py
并在其中:
def nested_f(x):
return x + 1
def main_f(x):
return nested_f(x) + 1
然后,您想使用
main_f
函数制作UDF并在数据帧上运行它:import pyspark.sql.functions as fn
import pandas as pd
pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)
_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()
如果我们在与定义两个函数的位置相同的文件中进行此操作,则此操作正常(
udfs.py
)。但是,尝试从其他文件(例如main.py
)执行此操作会产生错误ModuleNotFoundError: No module named ...
:...
import udfs
_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()
我注意到,如果我实际上将
nested_f
嵌套在main_f
内,如下所示:def main_f(x):
def nested_f(x):
return x + 1
return nested_f(x) + 1
一切正常。但是,我的目标是将逻辑很好地分离成多个功能,也可以分别进行测试。
我认为可以通过使用
udfs.py
将spark.sparkContext.addPyFile('...udfs.py')
文件(或整个压缩文件夹)提交给执行者来解决。然而:我觉得这有点long(尤其是如果您需要压缩文件夹等...)
这并不总是容易/可能的(例如
udfs.py
可能正在使用许多其他模块,然后还需要提交其他模块,从而导致一些连锁反应...)addPyFile
还有其他一些不便之处(例如autoreload can stop working等)所以问题是:有没有办法同时做所有这些事情:
将UDF的逻辑很好地拆分为多个Python函数
使用与定义逻辑所在位置不同的文件中的UDF
不需要使用
addPyFile
提交任何依赖项奖励积分,以阐明其工作原理/为什么不起作用!
最佳答案
对于较小的(一个或两个本地文件)依赖项,可以使用--py-files并枚举它们,并具有更大或更多的依赖项-最好将其打包为zip或egg文件。
文件udfs.py
:
def my_function(*args, **kwargs):
# code
文件
main.py
:from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from udfs import my_function
sc = SparkContext()
spark = SparkSession(sc)
my_udf = udf(my_function)
df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("my_f", my_udf("..."))
运行:
pyspark --py-files /path/to/udfs.py
# or
spark-submit --py-files /path/to/udfs.py main.py
如果您编写了自己的Python模块或什至是第三方模块(不需要C编译),那么我个人需要使用
geoip2
,最好创建一个zip或egg文件。# pip with -t install all modules and dependencies in directory `src`
pip install geoip2 -t ./src
# Or from local directory
pip install ./my_module -t ./src
# Best is
pip install -r requirements.txt -t ./src
# If you need add some additionals files
cp ./some_scripts/* ./src/
# And pack it
cd ./src
zip -r ../libs.zip .
cd ..
pyspark --py-files libs.zip
spark-submit --py-files libs.zip
在
pyspark --master yarn
的pyspark shell中使用--py-files
(可能与其他非本地主选项)时要小心:>>> import sys
>>> sys.path.insert(0, '/path/to/libs.zip') # You can use relative path: .insert(0, 'libs.zip')
>>> import MyModule # libs.zip/MyModule
编辑-关于如何在没有
addPyFile ()
和--py-files
的执行程序上获取功能的问题的答案:必须具有给定文件,该文件具有在单个执行程序上的功能。并可以通过PATH env到达。
因此,我可能会编写一个Python模块,然后将其安装在执行程序上并在环境中可用。