本文介绍了SparkSQL Lead/Lag 函数中的动态/变量偏移的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我们能否以某种方式使用取决于 spark SQL 中的领先/滞后函数中的列值的偏移值?
Can we somehow use an offset value that depends on the column value in lead/lag function in spark SQL ?
示例:以下是工作正常的方法.
Example : Here is what works fine.
val sampleData = Seq( ("bob","Developer",125000),
("mark","Developer",108000),
("carl","Tester",70000),
("peter","Developer",185000),
("jon","Tester",65000),
("roman","Tester",82000),
("simon","Developer",98000),
("eric","Developer",144000),
("carlos","Tester",75000),
("henry","Developer",110000)).toDF("Name","Role","Salary")
val window = Window.orderBy("Role")
//Derive lag column for salary
val laggingCol = lag(col("Salary"), 1).over(window)
//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")
//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
.when(col("SalaryDiff").>(0), "UP")
.otherwise("DOWN")
sampleData.withColumn("LastSalary", laggingCol)
.withColumn("SalaryDiff",salaryDifference)
.withColumn("Trend", trend).show()
现在,我的用例是我们必须传递的偏移量取决于整数类型的特定列.这有点我想工作:
Now, my use case is such that the offset that we have to pass depends on a particular Column of type Integer. This is somewhat I wanted to work :
val sampleData = Seq( ("bob","Developer",125000,2),
("mark","Developer",108000,3),
("carl","Tester",70000,3),
("peter","Developer",185000,2),
("jon","Tester",65000,1),
("roman","Tester",82000,1),
("simon","Developer",98000,2),
("eric","Developer",144000,3),
("carlos","Tester",75000,2),
("henry","Developer",110000,2)).toDF("Name","Role","Salary","ColumnForOffset")
val window = Window.orderBy("Role")
//Derive lag column for salary
val laggingCol = lag(col("Salary"), col("ColumnForOffset")).over(window)
//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")
//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
.when(col("SalaryDiff").>(0), "UP")
.otherwise("DOWN")
sampleData.withColumn("LastSalary", laggingCol)
.withColumn("SalaryDiff",salaryDifference)
.withColumn("Trend", trend).show()
这将按预期抛出异常,因为偏移量仅采用整数值.让我们讨论一下是否可以为此实现某种逻辑.
This will throw an exception as expected since offset only takes Integer value.Let us discuss if we can somehow implement a logic for this.
推荐答案
可以添加行号列,并根据行号和偏移量进行自连接,例如:
You can add a row number column, and do a self join based on the row number and offset, e.g.:
val df = sampleData.withColumn("rn", row_number().over(window))
val df2 = df.alias("t1").join(
df.alias("t2"),
expr("t1.rn = t2.rn + t1.ColumnForOffset"),
"left"
).selectExpr("t1.*", "t2.Salary as LastSalary")
df2.show
+------+---------+------+---------------+---+----------+
| Name| Role|Salary|ColumnForOffset| rn|LastSalary|
+------+---------+------+---------------+---+----------+
| bob|Developer|125000| 2| 1| null|
| mark|Developer|108000| 3| 2| null|
| peter|Developer|185000| 2| 3| 125000|
| simon|Developer| 98000| 2| 4| 108000|
| eric|Developer|144000| 3| 5| 108000|
| henry|Developer|110000| 2| 6| 98000|
| carl| Tester| 70000| 3| 7| 98000|
| jon| Tester| 65000| 1| 8| 70000|
| roman| Tester| 82000| 1| 9| 65000|
|carlos| Tester| 75000| 2| 10| 65000|
+------+---------+------+---------------+---+----------+
这篇关于SparkSQL Lead/Lag 函数中的动态/变量偏移的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!