尝试在Spark 2.4.4中实现SCD Type 2逻辑。我有两个数据框;一个包含“现有数据”,另一个包含“新传入数据”。

输入和预期输出如下。需要发生的是:

1)所有传入的行都应附加到现有数据之后。

2)只有随后的3行之前处于“活动状态”,才应变为非活动状态,并填充以下适当的“ endDate”:

pk=1, amount = 20  => Row should become 'inactive' & 'endDate' is the 'startDate' of following row (Lead)

pk=2, amount = 100 => Row should become 'inactive' & 'endDate' is the 'startDate' of following row (Lead)

pk=3, amount = 750 => Row should become 'inactive' & 'endDate' is the 'startDate' of following row (Lead)


如何在Spark中执行此操作?请帮忙。

现有数据:

+---+------+-------------------+-------------------+------+
| pk|amount|          startDate|            endDate|active|
+---+------+-------------------+-------------------+------+
|  1|    10|2019-01-01 12:00:00|2019-01-20 05:00:00|     0|
|  1|    20|2019-01-20 05:00:00|               null|     1|
|  2|   100|2019-01-01 00:00:00|               null|     1|
|  3|    75|2019-01-01 06:00:00|2019-01-26 08:00:00|     0|
|  3|   750|2019-01-26 08:00:00|               null|     1|
| 10|    40|2019-01-01 00:00:00|               null|     1|
+---+------+-------------------+-------------------+------+


新的传入数据:

+---+------+-------------------+-------------------+------+
| pk|amount|          startDate|            endDate|active|
+---+------+-------------------+-------------------+------+
|  1|    50|2019-02-01 07:00:00|2019-02-02 08:00:00|     0|
|  1|    75|2019-02-02 08:00:00|               null|     1|
|  2|   200|2019-02-01 05:00:00|2019-02-01 13:00:00|     0|
|  2|    60|2019-02-01 13:00:00|2019-02-01 19:00:00|     0|
|  2|   500|2019-02-01 19:00:00|               null|     1|
|  3|   175|2019-02-01 00:00:00|               null|     1|
|  4|    50|2019-02-02 12:00:00|2019-02-02 14:00:00|     0|
|  4|   300|2019-02-02 14:00:00|               null|     1|
|  5|   500|2019-02-02 00:00:00|               null|     1|
+---+------+-------------------+-------------------+------+


预期产量:

+---+------+-------------------+-------------------+------+
| pk|amount|          startDate|            endDate|active|
+---+------+-------------------+-------------------+------+
|  1|    10|2019-01-01 12:00:00|2019-01-20 05:00:00|     0|
|  1|    20|2019-01-20 05:00:00|2019-02-01 07:00:00|     0|
|  1|    50|2019-02-01 07:00:00|2019-02-02 08:00:00|     0|
|  1|    75|2019-02-02 08:00:00|               null|     1|
|  2|   100|2019-01-01 00:00:00|2019-02-01 05:00:00|     0|
|  2|   200|2019-02-01 05:00:00|2019-02-01 13:00:00|     0|
|  2|    60|2019-02-01 13:00:00|2019-02-01 19:00:00|     0|
|  2|   500|2019-02-01 19:00:00|               null|     1|
|  3|    75|2019-01-01 06:00:00|2019-01-26 08:00:00|     0|
|  3|   750|2019-01-26 08:00:00|2019-02-01 00:00:00|     1|
|  3|   175|2019-02-01 00:00:00|               null|     1|
|  4|    50|2019-02-02 12:00:00|2019-02-02 14:00:00|     0|
|  4|   300|2019-02-02 14:00:00|               null|     1|
|  5|   500|2019-02-02 00:00:00|               null|     1|
| 10|    40|2019-01-01 00:00:00|               null|     1|
+---+------+-------------------+-------------------+------+

最佳答案

您可以从新的DataFrame中为每个组startDate选择第一个pk,然后与旧的联接以更新所需的列。
然后,您可以合并所有联接结果和新的DataFrame。

像这样:

// get first state by date for each pk group
val w = Window.partitionBy($"pk").orderBy($"startDate")
val updates = df_new.withColumn("rn", row_number.over(w)).filter("rn = 1").select($"pk", $"startDate")

// join with old data and update old values when there is match
val joinOldNew = df_old.join(updates.alias("new"), Seq("pk"), "left")
                       .withColumn("endDate", when($"endDate".isNull && $"active" === lit(1) && $"new.startDate".isNotNull, $"new.startDate").otherwise($"endDate"))
                       .withColumn("active", when($"endDate".isNull && $"active" === lit(1) && $"new.startDate".isNotNull, lit(0)).otherwise($"active"))
                       .drop($"new.startDate")

// union all
val result = joinOldNew.unionAll(df_new)

10-07 15:29