本文介绍了如何获取Pyspark中每行前3个最大值的列名的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
示例数据框
id a1 a2 a3 a4 a5 a6
0 5 23 4 1 4 5
1 6 43 2 2 98 43
2 3 56 3 1 23 3
3 2 2 6 3 5 2
4 5 6 7 2 7 5
我需要这样....
top1 top2 top3
a2 a1 a6
a5 a2 a6
推荐答案
希望对您有所帮助!
from pyspark.sql.functions import col, udf, array, sort_array
from pyspark.sql.types import StringType
df = sc.parallelize([(0, 5, 23, 4, 1, 4, 5),
(1, 6, 43, 2, 2, 98, 43),
(2, 3, 56, 3, 1, 23, 3),
(3, 2, 2, 6, 3, 5, 2),
(4, 5, 6, 7, 2, 7, 5)]).\
toDF(["id","a1","a2","a3","a4","a5","a6"])
df_col = df.columns
df = df.\
withColumn("top1_val", sort_array(array([col(x) for x in df_col[1:]]), asc=False)[0]).\
withColumn("top2_val", sort_array(array([col(x) for x in df_col[1:]]), asc=False)[1]).\
withColumn("top3_val", sort_array(array([col(x) for x in df_col[1:]]), asc=False)[2])
def modify_values(r, max_col):
l = []
for i in range(len(df_col[1:])):
if r[i]== max_col:
l.append(df_col[i+1])
return l
modify_values_udf = udf(modify_values, StringType())
df1 = df.\
withColumn("top1", modify_values_udf(array(df.columns[1:-3]), "top1_val")).\
withColumn("top2", modify_values_udf(array(df.columns[1:-3]), "top2_val")).\
withColumn("top3", modify_values_udf(array(df.columns[1:-3]), "top3_val"))
df1.show()
输出为:
+---+---+---+---+---+---+---+--------+--------+--------+--------+--------+------------+
| id| a1| a2| a3| a4| a5| a6|top1_val|top2_val|top3_val| top1| top2| top3|
+---+---+---+---+---+---+---+--------+--------+--------+--------+--------+------------+
| 0| 5| 23| 4| 1| 4| 5| 23| 5| 5| [a2]|[a1, a6]| [a1, a6]|
| 1| 6| 43| 2| 2| 98| 43| 98| 43| 43| [a5]|[a2, a6]| [a2, a6]|
| 2| 3| 56| 3| 1| 23| 3| 56| 23| 3| [a2]| [a5]|[a1, a3, a6]|
| 3| 2| 2| 6| 3| 5| 2| 6| 5| 3| [a3]| [a5]| [a4]|
| 4| 5| 6| 7| 2| 7| 5| 7| 7| 6|[a3, a5]|[a3, a5]| [a2]|
+---+---+---+---+---+---+---+--------+--------+--------+--------+--------+------------+
这篇关于如何获取Pyspark中每行前3个最大值的列名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!