我在名为 end_stats_df 的 pyspark 数据框中有以下数据:

values     start    end    cat1   cat2
10          1        2      A      B
11          1        2      C      B
12          1        2      D      B
510         1        2      D      C
550         1        2      C      B
500         1        2      A      B
80          1        3      A      B

我想通过以下方式聚合它:
  • 我想使用“开始”和“结束”列作为聚合键
  • 对于每组行,我需要执行以下操作:
  • 计算该组的 cat1cat2 中值的唯一数量。例如,对于 start =1 和 end =2 的组,这个数字将是 4,因为有 A、B、C、D。这个数字将存储为 n(在这个例子中 n=4)。
  • 对于 values 字段,对于每个组,我需要对 values 进行排序,然后选择每个 n-1 值,其中 n 是从上面的第一个操作中存储的值。
  • 在聚合结束时,我并不真正关心经过上述操作后 cat1cat2 中的内容。

  • 上面示例的示例输出是:
    values     start    end    cat1   cat2
    12          1        2      D      B
    550         1        2      C      B
    80          1        3      A      B
    

    如何使用 pyspark 数据框完成?我假设我需要使用自定义 UDAF,对吗?

    最佳答案

    Pyspark 不直接支持 UDAF,因此我们必须手动进行聚合。

    from pyspark.sql import functions as f
    
    def func(values, cat1, cat2):
        n = len(set(cat1 + cat2))
        return sorted(values)[n - 2]
    
    
    df = spark.read.load('file:///home/zht/PycharmProjects/test/text_file.txt', format='csv', sep='\t', header=True)
    df = df.groupBy(df['start'], df['end']).agg(f.collect_list(df['values']).alias('values'),
                                                f.collect_set(df['cat1']).alias('cat1'),
                                                f.collect_set(df['cat2']).alias('cat2'))
    df = df.select(df['start'], df['end'], f.UserDefinedFunction(func, StringType())(df['values'], df['cat1'], df['cat2']))
    

    关于apache-spark - 如何在多列上编写 Pyspark UDAF?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46187630/

    10-16 21:44