我正在实现代码以将多个列动态添加到数据行中具有空值的数据帧
我在使用Dataframe对象的map函数的scala中找到了以下代码片段。
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, NullType, StructType}
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
import org.apache.spark.sql.functions.lit;
def addColumnsViaMap(df: DataFrame, words: List[String]): DataFrame = {
val encoder = RowEncoder.apply(getSchema(df, words))
df.map(mappingRows(df.schema)(words))(encoder)
}
private val mappingRows: StructType => List[String] => Row => Row =
(schema) => (words) => (row) => {
val addedCols: List[Any] = words.map(_=> null)
Row.merge(row, Row.fromSeq(addedCols))
}
private def getSchema(df: DataFrame, words: List[String]): StructType = {
var schema: StructType = df.schema
words.foreach(word => schema = schema.add(word, "string", false))
schema
}
我已经在Java中实现了以下两个功能
private StructType getSchema(Dataset<Row> df, List<String> cols){
StructType schema = df.schema();
cols.forEach(col -> schema.add(col, "int", true));
return schema;
}
private addColumnsViaMap(Dataset<Row> df, List<String> cols){
Encoder<Row> encoder1 =
RowEncoder.apply(dataConsolidationEngine.getSchema(df,cols));
df.map(new MapFunction<Set<String>, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(Set<String> cols) throws Exception {
// TODO Auto-generated method stub
}
}, encoder1);
}
由于参数不匹配,addColumnsViaMap方法具有编译错误,无法解析匿名映射函数方法。
而且我不理解mappingRows的scala代码,尤其是以下
StructType => List[String] => Row => Row = (schema) => (words) => (row)
这意味着什么?以及如何在Java中实现上述scala代码?
最佳答案
嗯,这个声明有点复杂(IMO也有点难以理解),所以让我们退后一步。
在scala中,String
,List
...是每个人都知道的类型。您可以创建类型为String
的变量。
您还可以做的是将函数分配给变量(这是scala的函数方向),因此函数也具有类型。举例来说,如果您有一个使用List
并输出String
的函数,则该函数的类型为List => String
。
在代码中看起来像吗?
// A list of strings
val names = List("alice", "bob")
// A function that takes a list and returns a string
def listToString(list: List[String]): String = list.mkString(",")
// We can assign the function to a variable
val myListToString: List[String] => String = listToString
但是对于函数的声明,我们有一个较短的表示法,我们可以在不使用
def
语句的情况下将它们声明为“内联”。这样就可以等效地编写以上代码:val names = List("alice", "bob")
val myListToString: List[String] => String = (list) => list.mkString(",")
因此,一般而言:
A => B
是一种函数的类型,该函数采用A
并返回B
(arg: A) => { new B() }
是一个实际函数,它将A
的一个实例作为输入(该实例绑定到变量名称arg
,并且其主体返回一个B的实例。现在,让我们做一些疯狂的事情,让我们...重新开始。假设
F
是接受List
并返回String
的函数。带有Int
并返回F
的函数是什么样的?好吧,它将是:
Int => F
。也就是说:
Int => (List => String)
可以写为
Int => List => String
你怎么宣布呢?
// Borrowing from above
val names = List("alice", "bob")
val myListToString: List[String] => String = (list) => list.mkString(",")
// now we're doing it
val intToListToString = (integerValue) => myListToString
// now we're doing it in one go
val intToListToString2 = (integerValue) => (list) => list.mkString(",")
在此,
intToListToString
是采用int
并返回“带有List
并返回String
的函数”的函数。您可以一次又一次地筑巢。
直到得到:
StructType => List[String] => Row => Row
,这是一种类型,表示“以StructType
作为输入并返回的函数(以List[String]
作为输入并返回的函数(以Row
作为输入的函数并返回一行))。您可以将其实现为:
(schema) => // a function that takes schema, and returns
(words) => // a function that takes a list of words and returns
(row) => // a function that takes a row and returns
Row.fromSeq(...) // another row
现在在Java中会是什么样子?
如果要严格按原样进行转换,则可以这样考虑:scala的
A => B
的自然等效项是java.util.Function<A, B>
。最重要的是,如果要使用函数在map
上执行Spark Dataframe
操作,则必须使用MapFunction<>
。因此,我们正在寻求实现
Function<Schema, Function<List<String>, MapFunction<Row, Row>>>
之类的东西。使用java lambda表示法,您可以通过以下方式实现:
schema -> words -> row -> Row.merge(row, Row.fromSeq(Array.newInstance(String.class, words.size)))
这是一个采用模式的功能,
返回一个需要单词列表的函数
返回一个需要行的函数
返回一行,该行增加了包含null的列
也许我的Java语法是正确的,也许不是我不知道。
我所知道的是,这是实现您的要求的一种过于复杂的方法。
这个要求是什么:您有一个数据框,有一个单词列表,您想使用此名称并包含null的新列。
所以我将在scala中完成以下操作:
import org.apache.spark.sql.DataFrame
def addColumnsViaMap(dataframe: DataFrame, words: List[String]) = words.foldLeft(dataframe)((df, word) => df.withColumn(word, lit(null: String)))
val dataframe = Seq(("a", "b"), ("c", "d")).toDF("columnA", "columnB")
val words = List("columnC", "columnD")
addColumnsViaMap(dataframe, words).show
+-------+-------+-------+-------+
|columnA|columnB|columnC|columnD|
+-------+-------+-------+-------+
| a| b| null| null|
| c| d| null| null|
+-------+-------+-------+-------+
您可能可以这样用Java编写
DataFrame addColumnsViaMap(DataFrame dataframe, List<String> words) {
for (String word: words) {
dataframe = dataframe.withColumn(word, lit((String) null))
}
return dataframe;
}
再一次,我没有基于Java的spark环境,但我的观点是:如果您掌握了原理,则重新编写很简单。