问题描述
我试图在Spark框架中使用scala编写一个内联函数,它将接受一个字符串输入,执行一个sql语句并返回一个字符串值val testfunc:(String => String)=(arg1:String)=>
{val k = sqlContext.sql(从r_c_tbl中选择c_code,其中x_nm =something)
k.head()。getString(0)
}
我将这个scala函数注册为UDF
val testFunc_test = udf(testFunc)
我有一个数据框通过配置单元表
val df = sqlContext.table(some_table)
然后我在withColumn中调用udf并尝试将它保存在一个新的数据框中。
val new_df = df.withColumn(test,testFunc_test($col1))
但每次我尝试做这个我都会得到一个错误
16/08/10 21: 17:08 WARN TaskSetManager:在阶段1.0(TID 1,10.0.1.5)中丢失任务0.0:java.lang.NullPointerException $ b $在org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala: 41)
在org.apache.sp
at org.apache.spark.sql.DataFrame.foreach(DataFrame.scala:1434)
我对spark和scala相对较新。但我不知道为什么这个代码不应该运行。任何见解或解决方法将受到高度赞赏。
请注意,我没有粘贴整个错误堆栈。请让我知道是否需要。
sqlContext 在你的UDF中 - UDF必须是序列化的才能发送给执行者,并且上下文(可以认为是到集群的连接)不能被序列化并发送到节点 - 只有驱动程序应用程序可以使用 sqlContext 。
$可以使用UDF定义,但不能执行 b $ b看起来像你的用例(从表Y的每条记录的表X执行选择)最好通过使用 join 来完成。
I am trying to write a inline function in spark framework using scala which will take a string input, execute a sql statement and return me a String value
val testfunc: (String=>String)= (arg1:String) => {val k = sqlContext.sql("""select c_code from r_c_tbl where x_nm = "something" """) k.head().getString(0) }I am registering this scala function as an UDF
val testFunc_test = udf(testFunc)I have a dataframe over a hive table
val df = sqlContext.table("some_table")Then I am calling the udf in a withColumn and trying to save it in a new dataframe.
val new_df = df.withColumn("test", testFunc_test($"col1"))But everytime i try do this i get an error
16/08/10 21:17:08 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 10.0.1.5): java.lang.NullPointerException at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:41) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086) at org.apache.spark.sql.DataFrame.foreach(DataFrame.scala:1434)I am relatively new to spark and scala . But I am not sure why this code should not run. Any insights or an work around will be highly appreciated.
Please note that I have not pasted the whole error stack . Please let me know if it is required.
解决方案You can't use sqlContext in your UDF - UDFs must be serializable to be shipped to executors, and the context (which can be thought of as a connection to the cluster) can't be serialized and sent to the node - only the driver application (where the UDF is defined, but not executed) can use the sqlContext.
Looks like your usecase (perform a select from table X per record in table Y) would better be accomplished by using a join.
这篇关于尝试从UDF执行spark sql查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!