我在名为 end_stats_df
的 pyspark 数据框中有以下数据:
values start end cat1 cat2
10 1 2 A B
11 1 2 C B
12 1 2 D B
510 1 2 D C
550 1 2 C B
500 1 2 A B
80 1 3 A B
我想通过以下方式聚合它:
cat1
和 cat2
中值的唯一数量。例如,对于 start
=1 和 end
=2 的组,这个数字将是 4,因为有 A、B、C、D。这个数字将存储为 n
(在这个例子中 n=4)。 values
字段,对于每个组,我需要对 values
进行排序,然后选择每个 n-1
值,其中 n
是从上面的第一个操作中存储的值。 cat1
和 cat2
中的内容。 上面示例的示例输出是:
values start end cat1 cat2
12 1 2 D B
550 1 2 C B
80 1 3 A B
如何使用 pyspark 数据框完成?我假设我需要使用自定义 UDAF,对吗?
最佳答案
Pyspark 不直接支持 UDAF
,因此我们必须手动进行聚合。
from pyspark.sql import functions as f
def func(values, cat1, cat2):
n = len(set(cat1 + cat2))
return sorted(values)[n - 2]
df = spark.read.load('file:///home/zht/PycharmProjects/test/text_file.txt', format='csv', sep='\t', header=True)
df = df.groupBy(df['start'], df['end']).agg(f.collect_list(df['values']).alias('values'),
f.collect_set(df['cat1']).alias('cat1'),
f.collect_set(df['cat2']).alias('cat2'))
df = df.select(df['start'], df['end'], f.UserDefinedFunction(func, StringType())(df['values'], df['cat1'], df['cat2']))
关于apache-spark - 如何在多列上编写 Pyspark UDAF?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46187630/