问题描述
给定一组针对各种电子邮件 ID 的交易.例如:
Given a set of transactions for various email ids. For example:
val df = Seq(
("[email protected]", "2020-10-01 01:04:00", "txid-0", false),
("[email protected]", "2020-10-02 01:04:00", "txid-1", true),
("[email protected]", "2020-10-02 02:04:00", "txid-2", false),
("[email protected]", "2020-10-02 03:04:00", "txid-3", true),
("[email protected]", "2020-10-02 04:04:00", "txid-4", false),
("[email protected]", "2020-10-02 04:05:00", "txid-5", false),
("[email protected]", "2020-10-02 05:04:00", "txid-6", true),
("[email protected]", "2020-10-05 12:04:00", "txid-7", true),
("[email protected]", "2020-12-03 03:04:00", "txid-8", true),
("[email protected]", "2020-12-04 06:04:00", "txid-9", true)
).toDF("email", "timestamp", "transaction_id", "condition")
我希望得到的是过去 24 小时内按电子邮件分组且 condition
为真的交易计数.如果 condition
为假,我只希望 count
列包含 condition
为真的最后一个好的计数.对于以上,这是结果:
What I am looking to get is the count of transactions grouped by email within the last 24 hours for which condition
is true. If condition
is false, I just want the count
column to contain the last good count for which condition
was true. For above, here is the result:
val expectedDF = Seq(
("[email protected]", "2020-10-01 01:04:00", "txid-0", false, 0),
("[email protected]", "2020-10-02 01:04:00", "txid-1", true, 1),
("[email protected]", "2020-10-02 02:04:00", "txid-2", false, 1),// copy last count since condition is false
("[email protected]", "2020-10-02 03:04:00", "txid-3", true, 2),
("[email protected]", "2020-10-02 04:04:00", "txid-4", false, 2),// copy last count since condition is false
("[email protected]", "2020-10-02 04:05:00", "txid-5", false, 2),// copy last count since condition is false
("[email protected]", "2020-10-02 05:04:00", "txid-6", true, 3),
("[email protected]", "2020-10-05 12:04:00", "txid-7", true, 1), // beyond 24 hrs from prev transaction
("[email protected]", "2020-12-03 03:04:00", "txid-8", true, 1), // new email
("[email protected]", "2020-12-04 06:04:00", "txid-9", true, 1) // new email
).toDF("email", "timestamp", "transaction_id", "condition", "count")
到目前为止我所做的是:
What I did so far is:
val new_df = df
.withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))
val winSpec = Window
.partitionBy("email")
.orderBy(col("transaction_timestamp"))
.rangeBetween(-24*3600, Window.currentRow)
val resultDF = new_df
.filter(col("condition"))
.withColumn("count", count(col("email")).over(winSpec))
resultDF.show()
打印的内容如下,没有带有 condition
== false 条件的行,但我希望所有行都具有适当的计数值,例如 expectedDF
:
What this prints is the following without the rows with condition
== false conditions but I want all the rows instead with proper count values like in expectedDF
:
("email", | "timestamp" | "transaction_id" | "condition" | "count")
("[email protected]", "2020-10-02 01:04:00", "txid-1", true, 1),
("[email protected]", "2020-10-02 03:04:00", "txid-3", true, 2),
("[email protected]", "2020-10-02 05:04:00", "txid-6", true, 3),
("[email protected]", "2020-10-05 12:04:00", "txid-7", true, 1),
("[email protected]", "2020-12-03 03:04:00", "txid-8", true, 1),
("[email protected]", "2020-12-04 06:04:00", "txid-9", true, 1)
我无法找到一种方法来应用窗口函数,它仅在条件为真时进行评估,否则在条件最后为真时复制最后一个好的值.任何帮助将不胜感激.
I am not able find a way to apply window function in a way that it only evaluates when condition is true else copies last good value when condition was last true. Any help will be appreciated.
推荐答案
不过滤,只使用 when
的条件表达式.
Do not filter but just use the conditional expression by using when
.
val resultDF = new_df
.withColumn("count", count(when(col("condition"), col("email"))).over(winSpec))
resultDF.show()
+-----------+-------------------+--------------+---------+---------------------+-----+
| email| timestamp|transaction_id|condition|transaction_timestamp|count|
+-----------+-------------------+--------------+---------+---------------------+-----+
|[email protected]|2020-10-01 01:04:00| txid-0| false| 1.60151424E9| 0|
|[email protected]|2020-10-02 01:04:00| txid-1| true| 1.60160064E9| 1|
|[email protected]|2020-10-02 02:04:00| txid-2| false| 1.60160424E9| 1|
|[email protected]|2020-10-02 03:04:00| txid-3| true| 1.60160784E9| 2|
|[email protected]|2020-10-02 04:04:00| txid-4| false| 1.60161144E9| 2|
|[email protected]|2020-10-02 04:05:00| txid-5| false| 1.6016115E9| 2|
|[email protected]|2020-10-02 05:04:00| txid-6| true| 1.60161504E9| 3|
|[email protected]|2020-10-05 12:04:00| txid-7| true| 1.60189944E9| 1|
|[email protected]|2020-12-04 06:04:00| txid-9| true| 1.60706184E9| 1|
|[email protected]|2020-12-03 03:04:00| txid-8| true| 1.60696464E9| 1|
+-----------+-------------------+--------------+---------+---------------------+-----+
这篇关于当条件为真时应用 Scala 窗口函数,否则用最后一个值填充的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!