我正在尝试使用Scala在Spark框架中编写内联函数,该函数将接受字符串输入,执行sql语句并返回一个String值
val testfunc: (String=>String)= (arg1:String) =>
{val k = sqlContext.sql("""select c_code from r_c_tbl where x_nm = "something" """)
k.head().getString(0)
}
我正在将此Scala函数注册为UDF
val testFunc_test = udf(testFunc)
我在 hive 表上有一个数据框
val df = sqlContext.table("some_table")
然后,我在withColumn中调用udf,并尝试将其保存在新的数据框中。
val new_df = df.withColumn("test", testFunc_test($"col1"))
但是每次我尝试这样做我都会收到一个错误
16/08/10 21:17:08 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 10.0.1.5): java.lang.NullPointerException
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:41)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
at org.apache.spark.sql.DataFrame.foreach(DataFrame.scala:1434)
我是Spark和Scala的新手。但是我不确定为什么不应该运行此代码。任何见解或变通将不胜感激。
请注意,我还没有粘贴整个错误堆栈。请让我知道是否需要。
最佳答案
您不能在UDF中使用sqlContext
-UDF必须可序列化才能交付给执行者,并且上下文(可以视为与群集的连接)不能被序列化并发送到节点-仅驱动程序应用程序(在其中定义了UDF但未执行的应用程序)可以使用sqlContext
。
看起来您的用例(对表Y中的每个记录从表X中进行选择)最好使用join
来完成。