我需要对RDD进行排序。排序需要在记录的多个字段上,因此我需要一个自定义比较器。

我看到sortBy因为它只接受一个键。我偶然遇到了http://codingjunkie.net/spark-secondary-sort/,因此使用了repartitionAndSortWithinPartitions来达到同样的效果。

sortBy为什么不接受自定义比较器并进行排序?为什么只需要重新分区才能使用自定义比较器?

最佳答案

问题1:这是方法sortBy的签名:

  /**
   * Return this RDD sorted by the given key function.
   */
  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
    this.keyBy[K](f)
        .sortByKey(ascending, numPartitions)
        .values
  }



  您的RDD数据对象显然是T类型的


请注意,sortBy方法绝对具有单个键参数字段:f: (T) => K

它接受匿名函数,因此您可以轻松生成自定义的可比较结构,并充分利用具有自己明确定义的比较器的常见数据类型。

例如,如果您将RDD [Int,Int]称为数据,则可以执行以下操作:

val cmp = (t: (Int, Int)) => (t._1, -t._2)
data.sortBy(cmp)


这样可以实现多字段轻松比较,对吧?


  这将获得排序的RDD,其中第一个字段递增,第二个字段递增
  下降。


问题2:repartitionAndSortWithinPartitions用法

这是一个特定的rdd运算符,旨在比调用重新分区然后在每个分区内进行排序更有效。

您的程序在排序之前不需要预先分区,只需在此特定通用模式下进行内部优化即可获得高性能。

有关详细信息,请参考document

关于java - 使用Spark排序,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46638504/

10-12 18:23
查看更多