我想知道火花中是否有比使用rank()来找到一组列中最频繁的值更有效的方法,以便将其用作缺少值的插补。

例如。在Spark-SQL中,我可以制定类似的东西
how to select the most frequently appearing values?
每列。
该解决方案适用于使用等级的单列。我正在寻找的是a)一种更有效的变体(如第一个答案所述),b)比使用for循环和a)解决方案以适用于多列的解决方案最佳的解决方案。

您看到优化火花的任何可能性吗?

编辑

一个例子。这是一个小的数据集

case class FooBarGG(foo: Int, bar: String, baz: String, dropme: String)
val df = Seq((0, "first", "A", "dropme"), (1, "second", "A", "dropme2"),
    (0, "first", "B", "foo"),
    (1, "first", "C", "foo"))
    .toDF("foo", "bar", "baz", "dropme").as[FooBarGG]
val columnsFactor = Seq("bar", "baz")
val columnsToDrop = Seq("dropme")
val factorCol= (columnsFactor ++ columnsToDrop).map(c => col(c))


从答案中查询

df.groupBy(factorCol: _*).count.agg(max(struct($"count" +: factorCol: _*)).alias("mostFrequent")).show
+--------------------+
|        mostFrequent|
+--------------------+
|[1,second,A,dropme2]|
+--------------------+
|-- mostFrequent: struct (nullable = true)
 |    |-- count: long (nullable = false)
 |    |-- bar: string (nullable = true)
 |    |-- baz: string (nullable = true)
 |    |-- dropme: string (nullable = true)


是结果,但对于列bar->首先,baz-> A和对于drompe-> foo是单个top1最频繁的值,与返回的结果不同。

最佳答案

您可以使用简单的聚合,只要您可以对字段进行排序并且计数是最主要的:

import org.apache.spark.sql.functions._

val df = Seq("John", "Jane", "Eve", "Joe", "Eve").toDF("name")
val grouping = Seq($"name")

df.groupBy(grouping: _*).count.agg(max(struct($"count" +: grouping: _*)))


也可以使用静态类型的Dataset

import org.apache.spark.sql.catalyst.encoders.RowEncoder

df.groupByKey(x => x)(RowEncoder(df.schema)).count.reduce(
  (x, y) => if (x._2 > y._2) x else y
)


您可以调整分组列或键功能以处理更复杂的情况。

10-08 19:02