我正在处理大量日志文件,并且想将工作移至Spark,但我不知道如何像在Pandas中那样轻松地在基于事件的时间窗口内聚合事件。
这正是我想要做的:
对于经历过某些事件的用户的日志文件(如下所示),我想回到7天的时间,并返回所有其他列的汇总。
在这里是熊猫。任何想法如何将其移植到PySpark?
import pandas as pd
df = pd.DataFrame({'user_id':[1,1,1,2,2,2], 'event':[0,1,0,0,0,1], 'other':[12, 20, 16, 84, 11, 15] , 'event_date':['2015-01-01 00:02:43', '2015-01-04 00:02:03', '2015-01-10 00:12:26', '2015-01-01 00:02:43', '2015-01-06 00:02:43', '2015-01-012 18:10:09']})
df['event_date'] = pd.to_datetime(df['event_date'])
df
给出:
event event_date other user_id
0 0 2015-01-01 00:02:43 12 1
1 1 2015-01-04 00:02:03 20 1
2 0 2015-01-10 00:12:26 16 1
3 0 2015-01-01 00:02:43 84 2
4 0 2015-01-06 00:02:43 11 2
5 1 2015-01-12 18:10:09 15 2
我想按user_id将此DataFrame分组,然后从聚合中排除行距“事件”早于7天的任何行。
在熊猫中,如下所示:
def f(x):
# Find event
win = x.event == 1
# Get the date when event === 1
event_date = list(x[win]['event_date'])[0]
# Construct the window
min_date = event_date - pd.DateOffset(days=7)
# Set x to this specific date window
x = x[(x.event_date > min_date) & (x.event_date <= event_date)]
# Aggregate other
x['other'] = x.other.sum()
return x[win] #, x[z]])
df.groupby(by='user_id').apply(f).reset_index(drop=True)
提供所需的输出(每个用户一行,其中event_date对应于event == 1):
event event_date other user_id
0 1 2015-01-04 00:02:03 32 1
1 1 2015-01-12 18:10:09 26 2
有人知道在Spark中从哪里开始获得此结果吗?
最佳答案
宁可使用SQLish,也可以执行以下操作:
from pyspark.sql.functions import sum, col, udf
from pyspark.sql.types import BooleanType
# With raw SQL you can use datediff but it looks like it is not
# available as a function yet
def less_than_n_days(n):
return udf(lambda dt1, dt2: 0 <= (dt1 - dt2).days < n, BooleanType())
# Select only events
events = df.where(df.event == 1).select(
df.event_date.alias("evd"), df.user_id.alias("uid"))
(events
.join(df, (events.uid == df.user_id) & (events.evd >= df.event_date))
.where(less_than_n_days(7)(col("evd"), col("event_date")))
.groupBy("evd", "user_id")
.agg(sum("other").alias("other"))
.withColumnRenamed("evd", "event_date"))
不幸的是,我们不能在
less_than_n_days
子句中包含join
,因为udf
只能访问单个表中的列。由于它不适用于内置datediff
,因此您可能更喜欢这样的原始SQL:df.registerTempTable("df")
events.registerTempTable("events")
sqlContext.sql("""
SELECT evd AS event_date, user_id, SUM(other) AS other
FROM df JOIN events ON
df.user_id = events.uid AND
datediff(evd, event_date) BETWEEN 0 AND 6
GROUP by evd, user_id""")
关于python - (Py)Spark-在一段时间内按用户分组,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/28707987/