


I've got a question on interpolating values in one column when I have complete TS column column ('b' here):

from pyspark.sql import SparkSession
from pyspark import Row

spark = SparkSession.builder \
    .appName('DataFrame') \
    .master('local[*]') \

df = spark.createDataFrame([Row(a=1, b='2019-09-26 09:53:10', c='7793740'),
                            Row(a=2, b='2019-09-26 09:54:12', c=''),
                            Row(a=3, b='2019-09-26 09:55:11', c='7793742'),
                            Row(a=4, b='2019-09-26 09:56:10', c=''),
                            Row(a=5, b='2019-09-26 09:57:11', c=''),
                            Row(a=6, b='2019-09-26 09:58:10', c='7793745'),
                            Row(a=7, b='2019-09-26 09:59:11', c=''),
                            Row(a=8, b='2019-09-26 10:00:10', c='7793747')])

df = df.withColumn('c', df['c'].cast('int'))
df = df.withColumn('b', df['b'].cast('timestamp'))

|  a|                  b|      c|
|  1|2019-09-26 09:53:10|7793740|
|  2|2019-09-26 09:54:12|   null|
|  3|2019-09-26 09:55:11|7793742|
|  4|2019-09-26 09:56:10|   null|
|  5|2019-09-26 09:57:11|   null|
|  6|2019-09-26 09:58:10|7793745|
|  7|2019-09-26 09:59:11|   null|
|  8|2019-09-26 10:00:10|7793747|


In pandas it would be simple, like:

import pandas as pd
import numpy as np

pdf = df.toPandas()

pdf = pdf.set_index('b')
pdf = pdf.interpolate(method='index', axis=0, limit_direction='forward')

                    b  a             c
0 2019-09-26 09:53:10  1  7.793740e+06
1 2019-09-26 09:54:12  2  7.793741e+06
2 2019-09-26 09:55:11  3  7.793742e+06
3 2019-09-26 09:56:10  4  7.793743e+06
4 2019-09-26 09:57:11  5  7.793744e+06
5 2019-09-26 09:58:10  6  7.793745e+06
6 2019-09-26 09:59:11  7  7.793746e+06
7 2019-09-26 10:00:10  8  7.793747e+06


Can we avoid udfs in my case? If not, how to use them (I'm thinking on case where I would have millions of rows).


Can we also use interpolation in both directions in cases when first value is null? Thank you!


Seems there's no direct function to interpolate between spark DataFrame columns, Here's my thought how to do this. We can put the interpolate into a UDF.

spark = SparkSession.builder.appName('test').getOrCreate()
df = spark.createDataFrame([Row(a=1, b='2019-09-26 09:53:10', c='7793740'),
                            Row(a=2, b='2019-09-26 09:54:12', c=''),
                            Row(a=3, b='2019-09-26 09:55:11', c='7793742'),
                            Row(a=4, b='2019-09-26 09:56:10', c=''),
                            Row(a=5, b='2019-09-26 09:57:11', c=''),
                            Row(a=6, b='2019-09-26 09:58:10', c='7793745'),
                            Row(a=7, b='2019-09-26 09:59:11', c=''),
                            Row(a=8, b='2019-09-26 10:00:10', c='7793747')])

df = df.withColumn('c', df['c'].cast('int'))
df = df.withColumn('b', df['b'].cast('timestamp'))

df = df.withColumn('flag', F.lit(1))
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def interpolate(pdf):
    pdf = pdf.set_index('b')
    pdf.sort_values(by=['a'], inplace=True)
    pdf = pdf.interpolate(method='index', axis=0, limit_direction='forward')
    return pdf

df = df.groupby(['flag']).apply(interpolate)



|  a|                  b|      c|flag|
|  1|2019-09-26 09:53:10|7793740|   1|
|  2|2019-09-26 09:54:12|7793741|   1|
|  3|2019-09-26 09:55:11|7793742|   1|
|  4|2019-09-26 09:56:10|7793742|   1|
|  5|2019-09-26 09:57:11|7793744|   1|
|  6|2019-09-26 09:58:10|7793745|   1|
|  7|2019-09-26 09:59:11|7793746|   1|
|  8|2019-09-26 10:00:10|7793747|   1|

If there are millions of rows, you can use two or three flag values, i.e. [1,2], splitting the data into several groups, and apply interpolation on each sub-range. But do use limit_area to restrain on interpolate only. There'll be at most two Nulls for each flag value. Then you re-assign the flags such that the nulls are enclosed by valid data, re-do the interpolation.


maybe other people can think about a better method.


