从pyspark中的下一列中删除空值和移位值

从pyspark中的下一列中删除空值和移位值

本文介绍了从pyspark中的下一列中删除空值和移位值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要将 Python 脚本转换为 Pyspark,这对我来说是一项艰巨的任务.

I need to transform a Python script to Pyspark and it's being a tough task for me.

我正在尝试从数据框中删除空值(不删除整个列或行)并将下一个值移到前一列.示例:

I'm trying to remove null values from a dataframe (without removing the entire column or row) and shift the next value to the prior column. Example:

        CLIENT| ANIMAL_1 | ANIMAL_2 | ANIMAL_3| ANIMAL_4
ROW_1     1   |   cow    | frog     | null    | dog
ROW_2     2   |   pig    | null     | cat     | null

我的目标是:

       CLIENT| ANIMAL_1 | ANIMAL_2 | ANIMAL_3| ANIMAL_4
ROW_1     1   |   cow    | frog     | dog     | null
ROW_2     2   |   pig    | cat      | null    | null

我在 python 上使用的代码是(我在 Stackoverflow 上得到的):

The code I'm using on python is (which I got here on Stackoverflow):

df_out = df.apply(lambda x: pd.Series(x.dropna().to_numpy()), axis=1)

然后我重命名列.但我不知道如何在 Pyspark 上做到这一点.

Then I rename the columns. But I have no idea how to do this on Pyspark.

推荐答案

以下是针对 Spark 2.4+ 版本执行此操作的方法:

Here's a way to do this for Spark version 2.4+:

创建您想要的列的数组并按您的条件排序,如下所示:

Create an array of the columns you want and sort by your conditions, which are the following:

  1. 先对非空值排序
  2. 按照列中出现的顺序对值进行排序

我们可以使用array_sort.要实现多个条件,请使用 arrays_zip.为了更轻松地提取您想要的值(即本例中的动物),还需要压缩列值.

We can do the sorting by using array_sort. To achieve the multiple conditions, use arrays_zip. To make it easy to extract the value you want (i.e. the animal in this example) zip column value as well.

from pyspark.sql.functions import array, array_sort, arrays_zip, col, lit

animal_cols = df.columns[1:]
N = len(animal_cols)

df_out = df.select(
    df.columns[0],
    array_sort(
        arrays_zip(
            array([col(c).isNull() for c in animal_cols]),
            array([lit(i) for i in range(N)]),
            array([col(c) for c in animal_cols])
        )
    ).alias('sorted')
)
df_out.show(truncate=False)
#+------+----------------------------------------------------------------+
#|CLIENT|sorted                                                          |
#+------+----------------------------------------------------------------+
#|1     |[[false, 0, cow], [false, 1, frog], [false, 3, dog], [true, 2,]]|
#|2     |[[false, 0, pig], [false, 2, cat], [true, 1,], [true, 3,]]      |
#+------+----------------------------------------------------------------+

既然事情的顺序是正确的,您只需要提取值.在本例中,它是 sorted 列的第 i 个索引中元素 '2' 处的项目.

Now that things are in the right order, you just need to extract the value. In this case, that's the item at element '2' in the i-th index of sorted column.

df_out = df_out.select(
    df.columns[0],
    *[col("sorted")[i]['2'].alias(animal_cols[i]) for i in range(N)]
)
df_out.show(truncate=False)
#+------+--------+--------+--------+--------+
#|CLIENT|ANIMAL_1|ANIMAL_2|ANIMAL_3|ANIMAL_4|
#+------+--------+--------+--------+--------+
#|1     |cow     |frog    |dog     |null    |
#|2     |pig     |cat     |null    |null    |
#+------+--------+--------+--------+--------+

这篇关于从pyspark中的下一列中删除空值和移位值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-14 22:52