问题描述
我有一个PySpark数据框,可以跟踪几个月内产品价格和状态发生的变化.这意味着只有在与上个月相比发生变化(状态或价格)变化时,才会创建新行,例如下面的虚拟数据中
I have a PySpark dataframe that keeps track of changes that occur in a product's price and status over months. This means that a new row is created only when a change occurred (in either status or price) compared to the previous month, like in the dummy data below
----------------------------------------
|product_id| status | price| month |
----------------------------------------
|1 | available | 5 | 2019-10|
----------------------------------------
|1 | available | 8 | 2020-08|
----------------------------------------
|1 | limited | 8 | 2020-10|
----------------------------------------
|2 | limited | 1 | 2020-09|
----------------------------------------
|2 | limited | 3 | 2020-10|
----------------------------------------
我想创建一个数据框,以显示最近6个月中每个月的值.这意味着,只要上述数据帧中存在间隙,我就需要复制记录.例如,如果最近6个月是2020-07、2020-08,... 2020-12,则上述数据框的结果应为
I would like to create a dataframe that shows the values for each of the last 6 months. This means that I need to duplicate the records whenever there is a gap in the above dataframe. For example, if the last 6 months are 2020-07, 2020-08, ... 2020-12, then the result for the above dataframe should be
----------------------------------------
|product_id| status | price| month |
----------------------------------------
|1 | available | 5 | 2020-07|
----------------------------------------
|1 | available | 8 | 2020-08|
----------------------------------------
|1 | available | 8 | 2020-09|
----------------------------------------
|1 | limited | 8 | 2020-10|
----------------------------------------
|1 | limited | 8 | 2020-11|
----------------------------------------
|1 | limited | 8 | 2020-12|
----------------------------------------
|2 | limited | 1 | 2020-09|
----------------------------------------
|2 | limited | 3 | 2020-10|
----------------------------------------
|2 | limited | 3 | 2020-11|
----------------------------------------
|2 | limited | 3 | 2020-12|
----------------------------------------
请注意,对于product_id = 1,存在从2019-10开始的较旧记录,该记录一直传播到2020-08,然后进行修整,而对于product_id = 2,则没有2020-09之前的记录,因此没有2020-07月份的记录.,则2020-08年尚未填充(因为该产品在2020-09年之前不存在).
Notice that for product_id = 1 there was an older record from 2019-10 that was propagated until 2020-08 and then trimmed, whereas for product_id = 2 there were no records prior to 2020-09 and thus the months 2020-07, 2020-08 were not filled for it (as the product did not exist prior to 2020-09).
由于该数据帧包含数百万条记录,因此蛮力"操作将导致数据丢失.使用for循环并检查每个product_id的解决方案相当慢.似乎应该可以使用窗口函数来解决此问题,方法是创建一个next_month列,然后根据该列填充空白,但我不知道该如何实现.
Since the dataframe consists of millions of records, a "brute-force" solution using for loops and checking for each product_id is rather slow. It seems that it should be possible to solve this using window functions, by creating another column next_month and then filling in the gaps based on that column, but I don't know how to achieve that.
推荐答案
关于 @jxc 注释,我已经为该用例准备了答案.
With Respect to the @jxc comment, I have prepared the answer for this use case.
以下是代码段.
-
导入spark SQL函数
Import the spark SQL functions
导入功能为F,Window
准备样品数据
simpleData = ((1,"Available",5,"2020-07"),
(1,"Available",8,"2020-08"),
(1,"Limited",8,"2020-12"),
(2,"Limited",1,"2020-09"),
(2,"Limited",3,"2020-12")
)
columns= ["product_id", "status", "price", "month"]
-
创建示例数据的数据框
Creating dataframe of sample data
df = spark.createDataFrame(数据= simpleData,架构=列)
在数据框中添加日期列以获取正确的格式化日期
Add date column in dataframe to get proper formatted date
df0 = df.withColumn("date",F.to_date('month','yyyy-MM'))
df0.show()
+----------+---------+-----+-------+----------+
|product_id| status|price| month| date|
+----------+---------+-----+-------+----------+
| 1|Available| 5|2020-07|2020-07-01|
| 1|Available| 8|2020-08|2020-08-01|
| 1| Limited| 8|2020-12|2020-12-01|
| 2| Limited| 1|2020-09|2020-09-01|
| 2| Limited| 3|2020-12|2020-12-01|
+----------+---------+-----+-------+----------+
- 创建WinSpec w1并使用Window聚合函数查找下一个日期(w1),将其转换为前几个月以设置日期序列:
w1 = Window.partitionBy('product_id').orderBy('date')
df1 = df0.withColumn('end_date',F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
df1.show()
+----------+---------+-----+-------+----------+----------+
|product_id| status|price| month| date| end_date|
+----------+---------+-----+-------+----------+----------+
| 1|Available| 5|2020-07|2020-07-01|2020-07-01|
| 1|Available| 8|2020-08|2020-08-01|2020-11-01|
| 1| Limited| 8|2020-12|2020-12-01|2020-12-01|
| 2| Limited| 1|2020-09|2020-09-01|2020-11-01|
| 2| Limited| 3|2020-12|2020-12-01|2020-12-01|
+----------+---------+-----+-------+----------+----------+
- 使用months_between(end_date,date)计算两个日期之间的月数,并使用转换函数迭代序列(0,#months),创建一个具有date = add_months(date,i)和price = IF的named_struct(i = 0,price,price),请使用inline_outer爆炸结构数组.
df2 = df1.selectExpr("product_id", "status", inline_outer( transform( sequence(0,int(months_between(end_date, date)),1), i -> (add_months(date,i) as date, IF(i=0,price,price) as price) ) ) )
df2.show()
+----------+---------+----------+-----+
|product_id| status| date|price|
+----------+---------+----------+-----+
| 1|Available|2020-07-01| 5|
| 1|Available|2020-08-01| 8|
| 1|Available|2020-09-01| 8|
| 1|Available|2020-10-01| 8|
| 1|Available|2020-11-01| 8|
| 1| Limited|2020-12-01| 8|
| 2| Limited|2020-09-01| 1|
| 2| Limited|2020-10-01| 1|
| 2| Limited|2020-11-01| 1|
| 2| Limited|2020-12-01| 3|
+----------+---------+----------+-----+
- 在
product_id
上对数据帧进行分区,并在df3
中添加等级列,以获取每一行的行号.然后,为每个product_id
的新列max_rank
存储rank
列值的最大值,并将max_rank
存储到df4
- Partitioning the dataframe on
product_id
and adding a rank column indf3
to get row number for each row. Then, Storing the maximum ofrank
column value with new columnmax_rank
for eachproduct_id
and storingmax_rank
in todf4
w2 = Window.partitionBy('product_id').orderBy('date')
df3 = df2.withColumn('rank',F.row_number().over(w2))
Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int]
df3.show()
+----------+---------+----------+-----+----+
|product_id| status| date|price|rank|
+----------+---------+----------+-----+----+
| 1|Available|2020-07-01| 5| 1|
| 1|Available|2020-08-01| 8| 2|
| 1|Available|2020-09-01| 8| 3|
| 1|Available|2020-10-01| 8| 4|
| 1|Available|2020-11-01| 8| 5|
| 1| Limited|2020-12-01| 8| 6|
| 2| Limited|2020-09-01| 1| 1|
| 2| Limited|2020-10-01| 1| 2|
| 2| Limited|2020-11-01| 1| 3|
| 2| Limited|2020-12-01| 3| 4|
+----------+---------+----------+-----+----+
df4 = df3.groupBy("product_id").agg(F.max('rank').alias('max_rank'))
Schema: DataFrame[product_id: bigint, max_rank: int]
df4.show()
+----------+--------+
|product_id|max_rank|
+----------+--------+
| 1| 6|
| 2| 4|
+----------+--------+
- 在
product_id
上加入df3
和df4
数据帧得到max_rank
- Joining
df3
anddf4
dataframes onproduct_id
getmax_rank
df5 = df3.join(df4,df3.product_id == df4.product_id,"inner") \
.select(df3.product_id,df3.status,df3.date,df3.price,df3.rank,df4.max_rank)
Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int, max_rank: int]
df5.show()
+----------+---------+----------+-----+----+--------+
|product_id| status| date|price|rank|max_rank|
+----------+---------+----------+-----+----+--------+
| 1|Available|2020-07-01| 5| 1| 6|
| 1|Available|2020-08-01| 8| 2| 6|
| 1|Available|2020-09-01| 8| 3| 6|
| 1|Available|2020-10-01| 8| 4| 6|
| 1|Available|2020-11-01| 8| 5| 6|
| 1| Limited|2020-12-01| 8| 6| 6|
| 2| Limited|2020-09-01| 1| 1| 4|
| 2| Limited|2020-10-01| 1| 2| 4|
| 2| Limited|2020-11-01| 1| 3| 4|
| 2| Limited|2020-12-01| 3| 4| 4|
+----------+---------+----------+-----+----+--------+
- 然后最后使用
ween
函数过滤df5
数据帧以获取最新的6个月数据.
- Then finally filtering the
df5
dataframe usingbetween
function to get the latest 6 months data.
FinalResultDF = df5.filter(F.col('rank') \
.between(F.when((F.col('max_rank') > 5),(F.col('max_rank')-6)).otherwise(0),F.col('max_rank'))) \
.select(df5.product_id,df5.status,df5.date,df5.price)
FinalResultDF.show(truncate=False)
+----------+---------+----------+-----+
|product_id|status |date |price|
+----------+---------+----------+-----+
|1 |Available|2020-07-01|5 |
|1 |Available|2020-08-01|8 |
|1 |Available|2020-09-01|8 |
|1 |Available|2020-10-01|8 |
|1 |Available|2020-11-01|8 |
|1 |Limited |2020-12-01|8 |
|2 |Limited |2020-09-01|1 |
|2 |Limited |2020-10-01|1 |
|2 |Limited |2020-11-01|1 |
|2 |Limited |2020-12-01|3 |
+----------+---------+----------+-----+
这篇关于在PySpark数据框中的选定时间间隔内的日期间隔之间复制记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!