本文介绍了SparkSQL Lead/Lag 函数中的动态/变量偏移的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!


我们能否以某种方式使用取决于 spark SQL 中的领先/滞后函数中的列值的偏移值?

Can we somehow use an offset value that depends on the column value in lead/lag function in spark SQL ?


Example : Here is what works fine.

val sampleData = Seq( ("bob","Developer",125000),

val window = Window.orderBy("Role")

//Derive lag column for salary
val laggingCol = lag(col("Salary"), 1).over(window)

//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")

//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
  .when(col("SalaryDiff").>(0), "UP")

sampleData.withColumn("LastSalary", laggingCol)
  .withColumn("Trend", trend).show()


Now, my use case is such that the offset that we have to pass depends on a particular Column of type Integer. This is somewhat I wanted to work :

val sampleData = Seq( ("bob","Developer",125000,2),

val window = Window.orderBy("Role")

//Derive lag column for salary
val laggingCol = lag(col("Salary"), col("ColumnForOffset")).over(window)

//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")

//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
  .when(col("SalaryDiff").>(0), "UP")

sampleData.withColumn("LastSalary", laggingCol)
  .withColumn("Trend", trend).show()


This will throw an exception as expected since offset only takes Integer value.Let us discuss if we can somehow implement a logic for this.



You can add a row number column, and do a self join based on the row number and offset, e.g.:

val df = sampleData.withColumn("rn", row_number().over(window))

val df2 = df.alias("t1").join(
    expr("t1.rn = t2.rn + t1.ColumnForOffset"),
).selectExpr("t1.*", "t2.Salary as LastSalary")

|  Name|     Role|Salary|ColumnForOffset| rn|LastSalary|
|   bob|Developer|125000|              2|  1|      null|
|  mark|Developer|108000|              3|  2|      null|
| peter|Developer|185000|              2|  3|    125000|
| simon|Developer| 98000|              2|  4|    108000|
|  eric|Developer|144000|              3|  5|    108000|
| henry|Developer|110000|              2|  6|     98000|
|  carl|   Tester| 70000|              3|  7|     98000|
|   jon|   Tester| 65000|              1|  8|     70000|
| roman|   Tester| 82000|              1|  9|     65000|
|carlos|   Tester| 75000|              2| 10|     65000|

这篇关于SparkSQL Lead/Lag 函数中的动态/变量偏移的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-11 15:34