问题描述
我需要将 Python 脚本转换为 Pyspark,这对我来说是一项艰巨的任务.
I need to transform a Python script to Pyspark and it's being a tough task for me.
我正在尝试从数据框中删除空值(不删除整个列或行)并将下一个值移到前一列.示例:
I'm trying to remove null values from a dataframe (without removing the entire column or row) and shift the next value to the prior column. Example:
CLIENT| ANIMAL_1 | ANIMAL_2 | ANIMAL_3| ANIMAL_4
ROW_1 1 | cow | frog | null | dog
ROW_2 2 | pig | null | cat | null
我的目标是:
CLIENT| ANIMAL_1 | ANIMAL_2 | ANIMAL_3| ANIMAL_4
ROW_1 1 | cow | frog | dog | null
ROW_2 2 | pig | cat | null | null
我在 python 上使用的代码是(我在 Stackoverflow 上得到的):
The code I'm using on python is (which I got here on Stackoverflow):
df_out = df.apply(lambda x: pd.Series(x.dropna().to_numpy()), axis=1)
然后我重命名列.但我不知道如何在 Pyspark 上做到这一点.
Then I rename the columns. But I have no idea how to do this on Pyspark.
推荐答案
以下是针对 Spark 2.4+ 版本执行此操作的方法:
Here's a way to do this for Spark version 2.4+:
创建您想要的列的数组并按您的条件排序,如下所示:
Create an array of the columns you want and sort by your conditions, which are the following:
- 先对非空值排序
- 按照列中出现的顺序对值进行排序
我们可以使用array_sort
.要实现多个条件,请使用 arrays_zip
.为了更轻松地提取您想要的值(即本例中的动物),还需要压缩列值.
We can do the sorting by using array_sort
. To achieve the multiple conditions, use arrays_zip
. To make it easy to extract the value you want (i.e. the animal in this example) zip column value as well.
from pyspark.sql.functions import array, array_sort, arrays_zip, col, lit
animal_cols = df.columns[1:]
N = len(animal_cols)
df_out = df.select(
df.columns[0],
array_sort(
arrays_zip(
array([col(c).isNull() for c in animal_cols]),
array([lit(i) for i in range(N)]),
array([col(c) for c in animal_cols])
)
).alias('sorted')
)
df_out.show(truncate=False)
#+------+----------------------------------------------------------------+
#|CLIENT|sorted |
#+------+----------------------------------------------------------------+
#|1 |[[false, 0, cow], [false, 1, frog], [false, 3, dog], [true, 2,]]|
#|2 |[[false, 0, pig], [false, 2, cat], [true, 1,], [true, 3,]] |
#+------+----------------------------------------------------------------+
既然事情的顺序是正确的,您只需要提取值.在本例中,它是 sorted
列的第 i 个索引中元素 '2'
处的项目.
Now that things are in the right order, you just need to extract the value. In this case, that's the item at element '2'
in the i-th index of sorted
column.
df_out = df_out.select(
df.columns[0],
*[col("sorted")[i]['2'].alias(animal_cols[i]) for i in range(N)]
)
df_out.show(truncate=False)
#+------+--------+--------+--------+--------+
#|CLIENT|ANIMAL_1|ANIMAL_2|ANIMAL_3|ANIMAL_4|
#+------+--------+--------+--------+--------+
#|1 |cow |frog |dog |null |
#|2 |pig |cat |null |null |
#+------+--------+--------+--------+--------+
这篇关于从pyspark中的下一列中删除空值和移位值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!