我目前正在使用 Spark 2.1,并且有一个主脚本调用包含我所有转换方法的辅助模块。换句话说:

main.py
helper.py

在我的 helper.py 文件的顶部,我有几个自定义 UDF,我已按以下方式定义:
def reformat(s):
  return reformat_logic(s)
reformat_udf = udf(reformat, StringType())

在我将所有 UDF 拆分到帮助文件中之前,我能够使用 spark.sql('sql statement') 通过我的 SparkSession 对象连接到我的 Hive 元存储。但是,在我将 UDF 移动到帮助程序文件并在主脚本顶部导入该文件后,SparkSession 对象无法再连接到 Hive 并返回到默认的 Derby 数据库。尝试查询我的 Hive 表(例如 Hive support is required to insert into the following tables...)时,我也会出错

我已经能够通过将我的 UDF 移动到一个完全独立的文件中并且只在需要它们的函数中运行该模块的导入语句来解决我的问题(不确定这是否是好的做法,但它有效)。无论如何,有没有人理解为什么我在 Spark 和 UDF 方面看到如此奇特的行为?有没有人知道跨应用程序共享 UDF 的好方法?

最佳答案

在 Spark 2.2.0 之前 UserDefinedFunction 急切地创建 UserDefinedPythonFunction 对象,它代表 JVM 上的 Python UDF。此过程需要访问 SparkContextSparkSession 。如果调用 UserDefinedFunction.__init__ 时没有事件实例,Spark 将自动为您初始化上下文。

当您在导入 SparkSession.Builder.getOrCreate 对象后调用 UserDefinedFunction 时,它​​会返回现有的 SparkSession 实例,并且只能应用一些配置更改(enableHiveSupport 不在其中)。

要解决此问题,您应该在导入 UDF 之前初始化 SparkSession:

from pyspark.sql.session import SparkSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()

from helper import reformat_udf

此行为在 SPARK-19163 中描述并在 Spark 2.2.0 中修复。其他 API 改进包括装饰器语法 ( SPARK-19160 ) 和改进的文档字符串处理 ( SPARK-19161 )。

关于python - PySpark 2.1 : Importing module with UDF's breaks Hive connectivity,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43795915/

10-16 02:05