问题描述
注意:该问题与该问题相关:
Note: this question is linked from this question: Creting UDF function with NonPrimitive Data Type and using in Spark-sql Query: Scala
我已经在scala中创建了一个方法:
I have created a method in scala:
package test.udf.demo
object UDF_Class {
def transformDate( dateColumn: String, df: DataFrame) : DataFrame = {
val sparksession = SparkSession.builder().appName("App").getOrCreate()
val d=df.withColumn("calculatedCol", month(to_date(from_unixtime(unix_timestamp(col(dateColumn), "dd-MM-yyyy")))))
df.withColumn("date1", when(col("calculatedCol") === "01", concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy"))))-1, lit('-')),substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol")), "dd-MM- yyyy"))),3,4))
.when(col("calculatedCol") === "02",concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy"))))-1, lit('-')),substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol")), "dd-MM- yyyy"))),3,4)))
.when(col("calculatedCol") === "03",concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy"))))-1, lit('-')),substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol")), "dd-MM-yyyy"))),3,4)))
.otherwise(concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy")))), lit('-')), substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM-yyyy")))) + 1, 3, 4)))))
val d1=sparksession.udf.register("transform",transformDate _)
d
}
}
我想在我的sparksql查询中使用此transformDate方法,该方法是同一包中的单独scala代码.
I want to use this transformDate method in my sparksql query which is separate scala code in same package.
package test.udf.demo
import test.udf.demo.transformDate
//sparksession
sparksession.sql("select id,name,salary,transform(dob) from dbname.tablename")
但出现错误
有人可以引导我吗?
推荐答案
首先,Spark SQL UDF是基于行的函数.不是基于数据框的方法. UDF聚合也需要一系列Row.因此,UDF定义是错误的.如果我正确理解了您的要求,那么您想创建一个Case语句的可配置表达式.它可以通过expr()
First of all Spark SQL UDF is a Row based function. Not a Dataframe based method. Aggregate UDF also takes a series of Row. So the UDF definition is wrong. If I understood your requirement correctly you want to create a configurable expression of Case statements. It can be easily achieved by expr()
import spark.implicits._
val exprStr = "case when calculatedCol='01' then <here goes your code statements> as FP"
val modifiedDf = sql("""select id,name,salary,$exprStr from dbname.tablename""")
它将起作用
这篇关于为日期创建和使用Spark-Hive UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!