我正在实现代码以将多个列动态添加到数据行中具有空值的数据帧

我在使用Dataframe对象的map函数的scala中找到了以下代码片段。

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, NullType, StructType}
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
import org.apache.spark.sql.functions.lit;

def addColumnsViaMap(df: DataFrame, words: List[String]): DataFrame = {
   val encoder = RowEncoder.apply(getSchema(df, words))
   df.map(mappingRows(df.schema)(words))(encoder)
}

private val mappingRows: StructType => List[String] => Row => Row =
  (schema) => (words) => (row) => {
     val addedCols: List[Any] = words.map(_=> null)
    Row.merge(row, Row.fromSeq(addedCols))
  }

private def getSchema(df: DataFrame, words: List[String]): StructType = {
  var schema: StructType = df.schema
  words.foreach(word => schema = schema.add(word, "string", false))
  schema
}


我已经在Java中实现了以下两个功能

 private StructType getSchema(Dataset<Row> df, List<String> cols){
        StructType schema = df.schema();
        cols.forEach(col -> schema.add(col, "int", true));
        return schema;
    }

private addColumnsViaMap(Dataset<Row> df, List<String> cols){
    Encoder<Row> encoder1 =
  RowEncoder.apply(dataConsolidationEngine.getSchema(df,cols));
   df.map(new MapFunction<Set<String>, Row>() {
                private static final long serialVersionUID = 1L;

                @Override
                public Row call(Set<String> cols) throws Exception {
                    // TODO Auto-generated method stub
                }
            }, encoder1);
}


由于参数不匹配,addColumnsViaMap方法具有编译错误,无法解析匿名映射函数方法。

而且我不理解mappingRows的scala代码,尤其是以下StructType => List[String] => Row => Row = (schema) => (words) => (row)这意味着什么?

以及如何在Java中实现上述scala代码?

最佳答案

嗯,这个声明有点复杂(IMO也有点难以理解),所以让我们退后一步。

在scala中,StringList ...是每个人都知道的类型。您可以创建类型为String的变量。

您还可以做的是将函数分配给变量(这是scala的函数方向),因此函数也具有类型。举例来说,如果您有一个使用List并输出String的函数,则该函数的类型为List => String

在代码中看起来像吗?

// A list of strings
val names = List("alice", "bob")
// A function that takes a list and returns a string
def listToString(list: List[String]): String = list.mkString(",")
// We can assign the function to a variable
val myListToString: List[String] => String = listToString


但是对于函数的声明,我们有一个较短的表示法,我们可以在不使用def语句的情况下将它们声明为“内联”。这样就可以等效地编写以上代码:

val names = List("alice", "bob")
val myListToString: List[String] => String = (list) => list.mkString(",")


因此,一般而言:


A => B是一种函数的类型,该函数采用A并返回B
(arg: A) => { new B() }是一个实际函数,它将A的一个实例作为输入(该实例绑定到变量名称arg,并且其主体返回一个B的实例。


现在,让我们做一些疯狂的事情,让我们...重新开始。假设F是接受List并返回String的函数。带有Int并返回F的函数是什么样的?

好吧,它将是:


Int => F
也就是说:Int => (List => String)
可以写为Int => List => String


你怎么宣布呢?

// Borrowing from above
val names = List("alice", "bob")
val myListToString: List[String] => String = (list) => list.mkString(",")
// now we're doing it
val intToListToString = (integerValue) => myListToString
// now we're doing it in one go
val intToListToString2 = (integerValue) => (list) => list.mkString(",")


在此,intToListToString是采用int并返回“带有List并返回String的函数”的函数。

您可以一次又一次地筑巢。

直到得到:StructType => List[String] => Row => Row,这是一种类型,表示“以StructType作为输入并返回的函数(以List[String]作为输入并返回的函数(以Row作为输入的函数并返回一行))。

您可以将其实现为:

(schema) => // a function that takes schema, and returns
    (words) => // a function that takes a list of words and returns
        (row) => // a function that takes a row and returns
            Row.fromSeq(...) // another row


现在在Java中会是什么样子?

如果要严格按原样进行转换,则可以这样考虑:scala的A => B的自然等效项是java.util.Function<A, B>。最重要的是,如果要使用函数在map上执行Spark Dataframe操作,则必须使用MapFunction<>

因此,我们正在寻求实现Function<Schema, Function<List<String>, MapFunction<Row, Row>>>之类的东西。

使用java lambda表示法,您可以通过以下方式实现:

schema ->  words -> row -> Row.merge(row, Row.fromSeq(Array.newInstance(String.class, words.size)))


这是一个采用模式的功能,


返回一个需要单词列表的函数


返回一个需要行的函数


返回一行,该行增加了包含null的列




也许我的Java语法是正确的,也许不是我不知道。

我所知道的是,这是实现您的要求的一种过于复杂的方法。

这个要求是什么:您有一个数据框,有一个单词列表,您想使用此名称并包含null的新列。

所以我将在scala中完成以下操作:

import org.apache.spark.sql.DataFrame
def addColumnsViaMap(dataframe: DataFrame, words: List[String]) = words.foldLeft(dataframe)((df, word) => df.withColumn(word, lit(null: String)))

val dataframe = Seq(("a", "b"), ("c", "d")).toDF("columnA", "columnB")
val words = List("columnC", "columnD")
addColumnsViaMap(dataframe, words).show

+-------+-------+-------+-------+
|columnA|columnB|columnC|columnD|
+-------+-------+-------+-------+
|      a|      b|   null|   null|
|      c|      d|   null|   null|
+-------+-------+-------+-------+


您可能可以这样用Java编写

DataFrame addColumnsViaMap(DataFrame dataframe, List<String> words) {
    for (String word: words) {
        dataframe = dataframe.withColumn(word, lit((String) null))
    }
    return dataframe;
}


再一次,我没有基于Java的spark环境,但我的观点是:如果您掌握了原理,则重新编写很简单。

10-07 19:28
查看更多