本文介绍了PySpark->在一列中插值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我拥有完整的TS列列(此处为'b')时,我对一个列中的内插值有疑问:

I've got a question on interpolating values in one column when I have complete TS column column ('b' here):

from pyspark.sql import SparkSession
from pyspark import Row

spark = SparkSession.builder \
    .appName('DataFrame') \
    .master('local[*]') \
    .getOrCreate()

df = spark.createDataFrame([Row(a=1, b='2019-09-26 09:53:10', c='7793740'),
                            Row(a=2, b='2019-09-26 09:54:12', c=''),
                            Row(a=3, b='2019-09-26 09:55:11', c='7793742'),
                            Row(a=4, b='2019-09-26 09:56:10', c=''),
                            Row(a=5, b='2019-09-26 09:57:11', c=''),
                            Row(a=6, b='2019-09-26 09:58:10', c='7793745'),
                            Row(a=7, b='2019-09-26 09:59:11', c=''),
                            Row(a=8, b='2019-09-26 10:00:10', c='7793747')])

df = df.withColumn('c', df['c'].cast('int'))
df = df.withColumn('b', df['b'].cast('timestamp'))
df.show()

|  a|                  b|      c|
+---+-------------------+-------+
|  1|2019-09-26 09:53:10|7793740|
|  2|2019-09-26 09:54:12|   null|
|  3|2019-09-26 09:55:11|7793742|
|  4|2019-09-26 09:56:10|   null|
|  5|2019-09-26 09:57:11|   null|
|  6|2019-09-26 09:58:10|7793745|
|  7|2019-09-26 09:59:11|   null|
|  8|2019-09-26 10:00:10|7793747|
+---+-------------------+-------+

在大熊猫中,这很简单,例如:

In pandas it would be simple, like:

import pandas as pd
import numpy as np

pdf = df.toPandas()

pdf = pdf.set_index('b')
pdf = pdf.interpolate(method='index', axis=0, limit_direction='forward')
pdf.reset_index(inplace=True)

                    b  a             c
0 2019-09-26 09:53:10  1  7.793740e+06
1 2019-09-26 09:54:12  2  7.793741e+06
2 2019-09-26 09:55:11  3  7.793742e+06
3 2019-09-26 09:56:10  4  7.793743e+06
4 2019-09-26 09:57:11  5  7.793744e+06
5 2019-09-26 09:58:10  6  7.793745e+06
6 2019-09-26 09:59:11  7  7.793746e+06
7 2019-09-26 10:00:10  8  7.793747e+06

我们可以避免使用udfs吗?如果没有的话,如何使用它们(我在考虑会有几百万行的情况).

Can we avoid udfs in my case? If not, how to use them (I'm thinking on case where I would have millions of rows).

在第一个值为null的情况下,我们还可以在两个方向上使用插值吗?谢谢!

Can we also use interpolation in both directions in cases when first value is null? Thank you!

推荐答案

似乎没有直接函数可以在spark DataFrame列之间进行插值,这是我的想法.我们可以将插值放入UDF中.

Seems there's no direct function to interpolate between spark DataFrame columns, Here's my thought how to do this. We can put the interpolate into a UDF.

spark = SparkSession.builder.appName('test').getOrCreate()
df = spark.createDataFrame([Row(a=1, b='2019-09-26 09:53:10', c='7793740'),
                            Row(a=2, b='2019-09-26 09:54:12', c=''),
                            Row(a=3, b='2019-09-26 09:55:11', c='7793742'),
                            Row(a=4, b='2019-09-26 09:56:10', c=''),
                            Row(a=5, b='2019-09-26 09:57:11', c=''),
                            Row(a=6, b='2019-09-26 09:58:10', c='7793745'),
                            Row(a=7, b='2019-09-26 09:59:11', c=''),
                            Row(a=8, b='2019-09-26 10:00:10', c='7793747')])

df = df.withColumn('c', df['c'].cast('int'))
df = df.withColumn('b', df['b'].cast('timestamp'))

df = df.withColumn('flag', F.lit(1))
df.show()
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def interpolate(pdf):
    pdf = pdf.set_index('b')
    pdf.sort_values(by=['a'], inplace=True)
    pdf = pdf.interpolate(method='index', axis=0, limit_direction='forward')
    pdf.reset_index(inplace=True)
    return pdf

df = df.groupby(['flag']).apply(interpolate)

df.sort(df['a']).show()

这将输出:

+---+-------------------+-------+----+
|  a|                  b|      c|flag|
+---+-------------------+-------+----+
|  1|2019-09-26 09:53:10|7793740|   1|
|  2|2019-09-26 09:54:12|7793741|   1|
|  3|2019-09-26 09:55:11|7793742|   1|
|  4|2019-09-26 09:56:10|7793742|   1|
|  5|2019-09-26 09:57:11|7793744|   1|
|  6|2019-09-26 09:58:10|7793745|   1|
|  7|2019-09-26 09:59:11|7793746|   1|
|  8|2019-09-26 10:00:10|7793747|   1|
+---+-------------------+-------+----+

如果有数百万行,则可以使用两个或三个标志值,即[1,2],将数据分为几组,然后对每个子范围应用插值.但是请使用 limit_area 仅限制 interpolate .每个标志值最多有两个Null.然后重新分配标志,以使有效数据包围空值,然后重新进行插值.

If there are millions of rows, you can use two or three flag values, i.e. [1,2], splitting the data into several groups, and apply interpolation on each sub-range. But do use limit_area to restrain on interpolate only. There'll be at most two Nulls for each flag value. Then you re-assign the flags such that the nulls are enclosed by valid data, re-do the interpolation.

也许其他人可以考虑一种更好的方法.

maybe other people can think about a better method.

这篇关于PySpark->在一列中插值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 10:15