问题描述
我在pyspark中有一个数据框.这是它的样子,
I have a dataframe in pyspark. Here is what it looks like,
+---------+---------+
|timestamp| price |
+---------+---------+
|670098928| 50 |
|670098930| 53 |
|670098934| 55 |
+---------+---------+
我想用以前的状态填补时间戳的空白,这样我就可以得到一个完美的集合来计算时间加权平均值.这是输出的样子-
I want to fill in the gaps in timestamp with the previous state, so that I can get a perfect set to calculate time weighted averages. Here is what the output should be like -
+---------+---------+
|timestamp| price |
+---------+---------+
|670098928| 50 |
|670098929| 50 |
|670098930| 53 |
|670098931| 53 |
|670098932| 53 |
|670098933| 53 |
|670098934| 55 |
+---------+---------+
最终,我想将此新数据帧保留在磁盘上并可视化我的分析.
Eventually, I want to persist this new dataframe on disk and visualize my analysis.
我如何在pyspark中做到这一点?(为简单起见,我只保留了2列.在填补空白之前,我的实际数据框有89列,包含约6.7亿条记录.)
How do I do this in pyspark? (For simplicity sake, I have just kept 2 columns. My actual dataframe has 89 columns with ~670 million records before filling the gaps.)
推荐答案
您可以生成时间戳范围,将其展平并选择行
You can generate timestamp ranges, flatten them and select rows
import pyspark.sql.functions as func
from pyspark.sql.types import IntegerType, ArrayType
a=sc.parallelize([[670098928, 50],[670098930, 53], [670098934, 55]])\
.toDF(['timestamp','price'])
f=func.udf(lambda x:range(x,x+5),ArrayType(IntegerType()))
a.withColumn('timestamp',f(a.timestamp))\
.withColumn('timestamp',func.explode(func.col('timestamp')))\
.groupBy('timestamp')\
.agg(func.max(func.col('price')))\
.show()
+---------+----------+
|timestamp|max(price)|
+---------+----------+
|670098928| 50|
|670098929| 50|
|670098930| 53|
|670098931| 53|
|670098932| 53|
|670098933| 53|
|670098934| 55|
|670098935| 55|
|670098936| 55|
|670098937| 55|
|670098938| 55|
+---------+----------+
这篇关于在Spark数据框中插入记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!