问题:
给定一个时间序列数据(它是用户活动的点击流)存储在配置单元中,要求使用Spark使用会话ID丰富数据。
会话定义
闲置1小时后,会话过期
会话保持活动状态,总计2个小时
数据:
click_time,user_id
2018-01-01 11:00:00,u1
2018-01-01 12:10:00,u1
2018-01-01 13:00:00,u1
2018-01-01 13:50:00,u1
2018-01-01 14:40:00,u1
2018-01-01 15:30:00,u1
2018-01-01 16:20:00,u1
2018-01-01 16:50:00,u1
2018-01-01 11:00:00,u2
2018-01-02 11:00:00,u2
以下是仅考虑会话定义中第一点的部分解决方案:
val win1 = Window.partitionBy("user_id").orderBy("click_time")
val sessionnew = when((unix_timestamp($"click_time") - unix_timestamp(lag($"click_time",1,"2017-01-01 11:00:00.0").over(win1)))/60 >= 60, 1).otherwise(0)
userActivity
.withColumn("session_num",sum(sessionnew).over(win1))
.withColumn("session_id",concat($"user_id", $"session_num"))
.show(truncate = false)
实际输出:
+---------------------+-------+-----------+----------+
|click_time |user_id|session_num|session_id|
+---------------------+-------+-----------+----------+
|2018-01-01 11:00:00.0|u1 |1 |u11 |
|2018-01-01 12:10:00.0|u1 |2 |u12 | -- session u12 starts
|2018-01-01 13:00:00.0|u1 |2 |u12 |
|2018-01-01 13:50:00.0|u1 |2 |u12 |
|2018-01-01 14:40:00.0|u1 |2 |u12 | -- this should be a new session as diff of session start of u12 and this row exceeds 2 hours
|2018-01-01 15:30:00.0|u1 |2 |u12 |
|2018-01-01 16:20:00.0|u1 |2 |u12 |
|2018-01-01 16:50:00.0|u1 |2 |u12 | -- now this has to be compared with row 5 to find difference
|2018-01-01 11:00:00.0|u2 |1 |u21 |
|2018-01-02 11:00:00.0|u2 |2 |u22 |
+---------------------+-------+-----------+----------+
为了包括第二个条件,我试图找出当前时间与上次会话开始时间之间的差异,以检查该时间是否超过了2小时,但是随后的行引用本身发生了变化。这些是一些可以通过累加总和实现的用例,但这并不适合此处。
最佳答案
这不是一个要解决的直截了当的问题,但这是一种方法:
使用窗口lag
时间戳差异来识别0
每个用户的会话(其中rule #1
=会话开始)
对数据集进行分组以组合每个用户的时间戳比较列表
通过UDF处理时间戳比较列表以标识rule #2
的会话并为每个用户创建所有会话ID
通过Spark的explode
扩展分组的数据集
下面的示例代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val userActivity = Seq(
("2018-01-01 11:00:00", "u1"),
("2018-01-01 12:10:00", "u1"),
("2018-01-01 13:00:00", "u1"),
("2018-01-01 13:50:00", "u1"),
("2018-01-01 14:40:00", "u1"),
("2018-01-01 15:30:00", "u1"),
("2018-01-01 16:20:00", "u1"),
("2018-01-01 16:50:00", "u1"),
("2018-01-01 11:00:00", "u2"),
("2018-01-02 11:00:00", "u2")
).toDF("click_time", "user_id")
def clickSessList(tmo: Long) = udf{ (uid: String, clickList: Seq[String], tsList: Seq[Long]) =>
def sid(n: Long) = s"$uid-$n"
val sessList = tsList.foldLeft( (List[String](), 0L, 0L) ){ case ((ls, j, k), i) =>
if (i == 0 || j + i >= tmo) (sid(k + 1) :: ls, 0L, k + 1) else
(sid(k) :: ls, j + i, k)
}._1.reverse
clickList zip sessList
}
请注意,UDF中
foldLeft
的累加器是(ls, j, k)
的元组,其中:ls
是要返回的格式化会话ID的列表j
和k
分别用于将条件更改的时间戳记值和会话ID号继续到下一次迭代步骤
1
:val tmo1: Long = 60 * 60
val tmo2: Long = 2 * 60 * 60
val win1 = Window.partitionBy("user_id").orderBy("click_time")
val df1 = userActivity.
withColumn("ts_diff", unix_timestamp($"click_time") - unix_timestamp(
lag($"click_time", 1).over(win1))
).
withColumn("ts_diff", when(row_number.over(win1) === 1 || $"ts_diff" >= tmo1, 0L).
otherwise($"ts_diff")
)
df1.show
// +-------------------+-------+-------+
// | click_time|user_id|ts_diff|
// +-------------------+-------+-------+
// |2018-01-01 11:00:00| u1| 0|
// |2018-01-01 12:10:00| u1| 0|
// |2018-01-01 13:00:00| u1| 3000|
// |2018-01-01 13:50:00| u1| 3000|
// |2018-01-01 14:40:00| u1| 3000|
// |2018-01-01 15:30:00| u1| 3000|
// |2018-01-01 16:20:00| u1| 3000|
// |2018-01-01 16:50:00| u1| 1800|
// |2018-01-01 11:00:00| u2| 0|
// |2018-01-02 11:00:00| u2| 0|
// +-------------------+-------+-------+
步骤
2
-4
:val df2 = df1.
groupBy("user_id").agg(
collect_list($"click_time").as("click_list"), collect_list($"ts_diff").as("ts_list")
).
withColumn("click_sess_id",
explode(clickSessList(tmo2)($"user_id", $"click_list", $"ts_list"))
).
select($"user_id", $"click_sess_id._1".as("click_time"), $"click_sess_id._2".as("sess_id"))
df2.show
// +-------+-------------------+-------+
// |user_id|click_time |sess_id|
// +-------+-------------------+-------+
// |u1 |2018-01-01 11:00:00|u1-1 |
// |u1 |2018-01-01 12:10:00|u1-2 |
// |u1 |2018-01-01 13:00:00|u1-2 |
// |u1 |2018-01-01 13:50:00|u1-2 |
// |u1 |2018-01-01 14:40:00|u1-3 |
// |u1 |2018-01-01 15:30:00|u1-3 |
// |u1 |2018-01-01 16:20:00|u1-3 |
// |u1 |2018-01-01 16:50:00|u1-4 |
// |u2 |2018-01-01 11:00:00|u2-1 |
// |u2 |2018-01-02 11:00:00|u2-2 |
// +-------+-------------------+-------+
另请注意,在步骤
click_time
-2
中“通过”了“ 4
”,以便将其包含在最终数据集中。关于sql - 动态高级的Spark高级窗口,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/54662219/