问题描述
我有一个 Spark (2.4.0) 数据框,其中一列只有两个值(0
或 1
).我需要计算此数据中连续 0
s 和 1
s 的连续性,如果值发生变化,将连续性重置为零.
I have a Spark (2.4.0) data frame with a column that has just two values (either 0
or 1
). I need to calculate the streak of consecutive 0
s and 1
s in this data, resetting the streak to zero if the value changes.
示例:
from pyspark.sql import (SparkSession, Window)
from pyspark.sql.functions import (to_date, row_number, lead, col)
spark = SparkSession.builder.appName('test').getOrCreate()
# Create dataframe
df = spark.createDataFrame([
('2018-01-01', 'John', 0, 0),
('2018-01-01', 'Paul', 1, 0),
('2018-01-08', 'Paul', 3, 1),
('2018-01-08', 'Pete', 4, 0),
('2018-01-08', 'John', 3, 0),
('2018-01-15', 'Mary', 6, 0),
('2018-01-15', 'Pete', 6, 0),
('2018-01-15', 'John', 6, 1),
('2018-01-15', 'Paul', 6, 1),
], ['str_date', 'name', 'value', 'flag'])
df.orderBy('name', 'str_date').show()
## +----------+----+-----+----+
## | str_date|name|value|flag|
## +----------+----+-----+----+
## |2018-01-01|John| 0| 0|
## |2018-01-08|John| 3| 0|
## |2018-01-15|John| 6| 1|
## |2018-01-15|Mary| 6| 0|
## |2018-01-01|Paul| 1| 0|
## |2018-01-08|Paul| 3| 1|
## |2018-01-15|Paul| 6| 1|
## |2018-01-08|Pete| 4| 0|
## |2018-01-15|Pete| 6| 0|
## +----------+----+-----+----+
根据这些数据,我想计算连续的 0 和 1,按日期排序并按名称窗口化":
With this data, I'd like to calculate the streak of consecutive zeros and ones, ordered by date and "windowed" by name:
# Expected result:
## +----------+----+-----+----+--------+--------+
## | str_date|name|value|flag|streak_0|streak_1|
## +----------+----+-----+----+--------+--------+
## |2018-01-01|John| 0| 0| 1| 0|
## |2018-01-08|John| 3| 0| 2| 0|
## |2018-01-15|John| 6| 1| 0| 1|
## |2018-01-15|Mary| 6| 0| 1| 0|
## |2018-01-01|Paul| 1| 0| 1| 0|
## |2018-01-08|Paul| 3| 1| 0| 1|
## |2018-01-15|Paul| 6| 1| 0| 2|
## |2018-01-08|Pete| 4| 0| 1| 0|
## |2018-01-15|Pete| 6| 0| 2| 0|
## +----------+----+-----+----+--------+--------+
当然,如果标志"发生变化,我需要将连续记录重置为零.
Of course, I would need the streak to reset itself to zero if the 'flag' changes.
有没有办法做到这一点?
Is there a way of doing this?
推荐答案
这需要采用行数差异的方法,首先将具有相同值的连续行分组,然后在各组之间使用排名方法.
This would require a difference in row numbers approach to first group consecutive rows with the same value and then using a ranking approach among the groups.
from pyspark.sql import Window
from pyspark.sql import functions as f
#Windows definition
w1 = Window.partitionBy(df.name).orderBy(df.date)
w2 = Window.partitionBy(df.name,df.flag).orderBy(df.date)
res = df.withColumn('grp',f.row_number().over(w1)-f.row_number().over(w2))
#Window definition for streak
w3 = Window.partitionBy(res.name,res.flag,res.grp).orderBy(res.date)
streak_res = res.withColumn('streak_0',f.when(res.flag == 1,0).otherwise(f.row_number().over(w3))) \
.withColumn('streak_1',f.when(res.flag == 0,0).otherwise(f.row_number().over(w3)))
streak_res.show()
这篇关于Pyspark:计算连续观察的连续性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!