问题描述
我想使用我自己的比较器根据列对 DataFrame 进行排序.可以在 Spark SQL 中做到这一点吗?
I would like to sort a DataFrame based on a column with my own comparator. It is possible to do this in Spark SQL?
例如,假设我有一个 DataFrame 注册为表MyTable",其中有一列Day",其类型为string":
For example, let's suppose that I have a DataFrame registred as Table "MyTable" with a column "Day" which its type is "string":
id | Day
--------------------
1 | Fri
2 | Mon
3 | Sat
4 | Sun
5 | Thu
我想执行这个查询:
SELECT * FROM MyTable ORDER BY Day
我想用我自己的比较器订购Day"列.我想过使用 UDF,但我不知道是否可行.请注意,我真的想在 Sort/Order By 操作中使用我的比较器.我不想将字符串从列 Day 转换为 Datetime 或类似的内容.
I would like to order the column "Day" with my own comparator. I thought about using a UDF but I don't know if it is possible. Note that I really want to use my comparator in Sort/Order By operations. I don't want to convert String from column Day to Datetime or something similar.
推荐答案
在 SparkSQL 中,您没有选择,需要对一列或多列使用 orderBy
.对于 RDD,如果您愿意,可以使用自定义的类 java 比较器.实际上,这里是 RDD
的 sortBy
方法的签名(参见 Spark 2.4 的 Scaladoc):
In SparkSQL, you do not have a choice and need to use orderBy
with one or more column(s). With RDDs, you can use a custom java-like comparator if you feel like it. Indeed, here is the signature of the sortBy
method of an RDD
(cf the scaladoc of Spark 2.4):
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
这意味着您可以提供您选择的 Ordering
,这与 java Comparator
完全一样(Ordering
实际上继承自 比较器
).
This means that you can provide an Ordering
of your choice, which is exactly like a java Comparator
(Ordering
actually inherit from Comparator
).
为简单起见,假设我想按列x"的绝对值排序(这可以在没有比较器的情况下完成,但假设我需要使用比较器).我首先在行上定义我的比较器:
For simplicity, let's say I want to sort by absolute value of a column 'x' (this can be done without a comparator, but let's assume I need to use a comparator). I start by defining my comparator on rows:
class RowOrdering extends Ordering[Row] {
def compare(x : Row, y : Row): Int = x.getAs[Int]("x").abs - y.getAs[Int]("x").abs
}
现在让我们定义数据并对其进行排序:
Now let's define data and sort it:
val df = Seq( (0, 1),(1, 2),(2, 4),(3, 7),(4, 1),(5, -1),(6, -2),
(7, 5),(8, 5), (9, 0), (10, -9)).toDF("id", "x")
val rdd = df.rdd.sortBy(identity)(new RowOrdering(), scala.reflect.classTag[Row])
val sorted_df = spark.createDataFrame(rdd, df.schema)
sorted_df.show
+---+---+
| id| x|
+---+---+
| 9| 0|
| 0| 1|
| 4| 1|
| 5| -1|
| 6| -2|
| 1| 2|
| 2| 4|
| 7| 5|
| 8| 5|
| 3| 7|
| 10| -9|
+---+---+
另一种解决方案是定义一个隐式排序,这样您在排序时就不需要提供它.
Another solution is to define an implicit ordering so that you don't need to provide it when sorting.
implicit val ord = new RowOrdering()
df.rdd.sortBy(identity)
最后,请注意 df.rdd.sortBy(_.getAs[Int]("x").abs)
将获得相同的结果.此外,您可以使用元组排序来执行更复杂的操作,例如按绝对值排序,如果相等,则将正值放在首位:
Finally, note that df.rdd.sortBy(_.getAs[Int]("x").abs)
would achive the same result. Also, you can use tuple ordering to do more complex things such as order by absolute values, and if equal, put the positive values first:
df.rdd.sortBy(x => (x.getAs[Int]("x").abs, - x.getAs[Int]("x"))) //RDD
df.orderBy(abs($"x"), - $"x") //dataframe
这篇关于如何使用 Scala 通过比较器对 DataFrame 进行排序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!