本文介绍了PYSpark显示最大值(S)和多重排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

感谢您在这里提供的帮助。使用Pyspark(请不能使用SQL)。因此,我有一个存储为RDD对的元组列表:

例如(‘City5’,‘2020-03-25’,‘X5’)作为关键字,15作为最后一对的值。

我希望获得以下结果:

请注意,结果显示为:

  • 每个城市的最大值关键字(这是最难的部分,如果在不同日期有相似的最大值,要显示同一城市两次,我假设不能使用ReduceByKey(),因为关键字不唯一,可能是GroupBy()或Filter()?

  • 按以下顺序排序:

  1. 最大值递减
  2. 升序日期
  3. 降序城市名称(例如:City1)

所以我尝试了以下代码:

res = rdd2.map(lambda x: ((x[0][0],x[0][2]), (x[0][1], x[1])))
rdd3 = res.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
rdd4 = rdd3.sortBy(lambda a: a[1][1], ascending=False)
rdd5 = rdd4.sortBy(lambda a: a[1][0])

虽然它给了我最大值的城市,但如果两个城市在两个不同的日期有相似的最大值,它不会两次返回同一个城市(因为减去了键:City)。

我希望它足够清楚,任何精确度请询问!非常感谢!

推荐答案

要使所有城市的值都等于最大值,您仍然可以使用reduceByKey,但使用数组而不是Over Value:

  • 将行转换为键/值,值是元组数组而不是元组
  • 按键减少,如果数组包含相同的值,则合并数组,否则保留具有最大值的数组,reduceByKey
  • 通过flatMap
  • 平面化Value数组,将键与它们合并
  • 最后执行排序

完整代码如下:

def merge(array1, array2):
    if array1[0][2] > array2[0][2]:
        return array1
    elif array1[0][2] == array2[0][2]:
        return array1 + array2
    else:
        return array2


res = rdd2.map(lambda x: (x[0][0], [(x[0][1], x[0][2], x[1])]))
rdd3 = res.reduceByKey(lambda x1, x2: merge(x1, x2))
rdd4 = rdd3.flatMap(lambda x: map(lambda y: (x[0], y[1], y[0], y[2]), x[1]))
rdd5 = rdd4.sortBy(lambda a: (-a[3], a[2], a[0]))

然后您可以打印您的RDD:

[print(', '.join([row[0], row[1], row[2], str(row[3])])) for row in rdd5.collect()]

这与您的输入一起提供了以下输出:

City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X5, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5

这篇关于PYSpark显示最大值(S)和多重排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 10:27