问题描述
我正在尝试使用 scala 在 spark 框架中编写一个内联函数,它将接受一个字符串输入,执行一个 sql 语句并返回一个字符串值
I am trying to write a inline function in spark framework using scala which will take a string input, execute a sql statement and return me a String value
val testfunc: (String=>String)= (arg1:String) =>
{val k = sqlContext.sql("""select c_code from r_c_tbl where x_nm = "something" """)
k.head().getString(0)
}
我正在将此 Scala 函数注册为 UDF
I am registering this scala function as an UDF
val testFunc_test = udf(testFunc)
我在 hive 表上有一个数据框
I have a dataframe over a hive table
val df = sqlContext.table("some_table")
然后我在 withColumn 中调用 udf 并尝试将其保存在新的数据框中.
Then I am calling the udf in a withColumn and trying to save it in a new dataframe.
val new_df = df.withColumn("test", testFunc_test($"col1"))
但是每次我尝试这样做时都会出错
But everytime i try do this i get an error
16/08/10 21:17:08 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 10.0.1.5): java.lang.NullPointerException
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:41)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
at org.apache.spark.sql.DataFrame.foreach(DataFrame.scala:1434)
我对 spark 和 scala 比较陌生.但我不确定为什么这段代码不应该运行.任何见解或解决方法将不胜感激.
I am relatively new to spark and scala . But I am not sure why this code should not run. Any insights or an work around will be highly appreciated.
请注意,我没有粘贴整个错误堆栈.如果需要,请告诉我.
Please note that I have not pasted the whole error stack . Please let me know if it is required.
推荐答案
您不能在 UDF 中使用 sqlContext
- UDF 必须是可序列化的才能传送到执行程序,并且上下文(可以被认为是与集群的连接)不能被序列化并发送到节点 - 只有驱动程序应用程序(UDF被定义,但不是执行)可以使用sqlContext
.
You can't use sqlContext
in your UDF - UDFs must be serializable to be shipped to executors, and the context (which can be thought of as a connection to the cluster) can't be serialized and sent to the node - only the driver application (where the UDF is defined, but not executed) can use the sqlContext
.
看起来您的用例(对表 Y 中的每个记录执行从表 X 中进行选择)最好通过使用 join
来完成.
Looks like your usecase (perform a select from table X per record in table Y) would better be accomplished by using a join
.
这篇关于尝试从 UDF 执行 spark sql 查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!