尝试在Spark 2.4.4中实现SCD Type 2逻辑。我有两个数据框;一个包含“现有数据”,另一个包含“新传入数据”。
输入和预期输出如下。需要发生的是:
1)所有传入的行都应附加到现有数据之后。
2)只有随后的3行之前处于“活动状态”,才应变为非活动状态,并填充以下适当的“ endDate”:
pk=1, amount = 20 => Row should become 'inactive' & 'endDate' is the 'startDate' of following row (Lead)
pk=2, amount = 100 => Row should become 'inactive' & 'endDate' is the 'startDate' of following row (Lead)
pk=3, amount = 750 => Row should become 'inactive' & 'endDate' is the 'startDate' of following row (Lead)
如何在Spark中执行此操作?请帮忙。
现有数据:
+---+------+-------------------+-------------------+------+
| pk|amount| startDate| endDate|active|
+---+------+-------------------+-------------------+------+
| 1| 10|2019-01-01 12:00:00|2019-01-20 05:00:00| 0|
| 1| 20|2019-01-20 05:00:00| null| 1|
| 2| 100|2019-01-01 00:00:00| null| 1|
| 3| 75|2019-01-01 06:00:00|2019-01-26 08:00:00| 0|
| 3| 750|2019-01-26 08:00:00| null| 1|
| 10| 40|2019-01-01 00:00:00| null| 1|
+---+------+-------------------+-------------------+------+
新的传入数据:
+---+------+-------------------+-------------------+------+
| pk|amount| startDate| endDate|active|
+---+------+-------------------+-------------------+------+
| 1| 50|2019-02-01 07:00:00|2019-02-02 08:00:00| 0|
| 1| 75|2019-02-02 08:00:00| null| 1|
| 2| 200|2019-02-01 05:00:00|2019-02-01 13:00:00| 0|
| 2| 60|2019-02-01 13:00:00|2019-02-01 19:00:00| 0|
| 2| 500|2019-02-01 19:00:00| null| 1|
| 3| 175|2019-02-01 00:00:00| null| 1|
| 4| 50|2019-02-02 12:00:00|2019-02-02 14:00:00| 0|
| 4| 300|2019-02-02 14:00:00| null| 1|
| 5| 500|2019-02-02 00:00:00| null| 1|
+---+------+-------------------+-------------------+------+
预期产量:
+---+------+-------------------+-------------------+------+
| pk|amount| startDate| endDate|active|
+---+------+-------------------+-------------------+------+
| 1| 10|2019-01-01 12:00:00|2019-01-20 05:00:00| 0|
| 1| 20|2019-01-20 05:00:00|2019-02-01 07:00:00| 0|
| 1| 50|2019-02-01 07:00:00|2019-02-02 08:00:00| 0|
| 1| 75|2019-02-02 08:00:00| null| 1|
| 2| 100|2019-01-01 00:00:00|2019-02-01 05:00:00| 0|
| 2| 200|2019-02-01 05:00:00|2019-02-01 13:00:00| 0|
| 2| 60|2019-02-01 13:00:00|2019-02-01 19:00:00| 0|
| 2| 500|2019-02-01 19:00:00| null| 1|
| 3| 75|2019-01-01 06:00:00|2019-01-26 08:00:00| 0|
| 3| 750|2019-01-26 08:00:00|2019-02-01 00:00:00| 1|
| 3| 175|2019-02-01 00:00:00| null| 1|
| 4| 50|2019-02-02 12:00:00|2019-02-02 14:00:00| 0|
| 4| 300|2019-02-02 14:00:00| null| 1|
| 5| 500|2019-02-02 00:00:00| null| 1|
| 10| 40|2019-01-01 00:00:00| null| 1|
+---+------+-------------------+-------------------+------+
最佳答案
您可以从新的DataFrame中为每个组startDate
选择第一个pk
,然后与旧的联接以更新所需的列。
然后,您可以合并所有联接结果和新的DataFrame。
像这样:
// get first state by date for each pk group
val w = Window.partitionBy($"pk").orderBy($"startDate")
val updates = df_new.withColumn("rn", row_number.over(w)).filter("rn = 1").select($"pk", $"startDate")
// join with old data and update old values when there is match
val joinOldNew = df_old.join(updates.alias("new"), Seq("pk"), "left")
.withColumn("endDate", when($"endDate".isNull && $"active" === lit(1) && $"new.startDate".isNotNull, $"new.startDate").otherwise($"endDate"))
.withColumn("active", when($"endDate".isNull && $"active" === lit(1) && $"new.startDate".isNotNull, lit(0)).otherwise($"active"))
.drop($"new.startDate")
// union all
val result = joinOldNew.unionAll(df_new)