问题描述
如您所知,DataFrame 可以包含复杂类型的字段,例如结构 (StructType) 或数组 (ArrayType).在我的情况下,您可能需要将所有 DataFrame 数据映射到一个带有简单类型字段(字符串、整数...)的 Hive 表.我已经在这个问题上挣扎了很长时间,我终于找到了一个我想分享的解决方案.另外,我相信它可以改进,所以请随时回复您自己的建议.
As you may know, a DataFrame can contain fields which are complex types, like structures (StructType) or arrays (ArrayType). You may need, as in my case, to map all the DataFrame data to a Hive table, with simple type fields (String, Integer...).I've been struggling with this issue for a long time, and I've finally found a solution I want to share.Also, I'm sure it could be improved, so feel free to reply with your own suggestions.
它基于 这个线程,但也有效对于 ArrayType 元素,不仅仅是 StructType 元素.它是一个尾递归函数,它接收一个 DataFrame,并将其扁平化返回.
It's based on this thread, but also works for ArrayType elements, not only StructType ones.It is a tail recursive function which receives a DataFrame, and returns it flattened.
def flattenDf(df: DataFrame): DataFrame = {
var end = false
var i = 0
val fields = df.schema.fields
val fieldNames = fields.map(f => f.name)
val fieldsNumber = fields.length
while (!end) {
val field = fields(i)
val fieldName = field.name
field.dataType match {
case st: StructType =>
val childFieldNames = st.fieldNames.map(n => fieldName + "." + n)
val newFieldNames = fieldNames.filter(_ != fieldName) ++ childFieldNames
val newDf = df.selectExpr(newFieldNames: _*)
return flattenDf(newDf)
case at: ArrayType =>
val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName)
val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode($fieldName) as a")
val fieldNamesToSelect = fieldNamesExcludingArray ++ Array("a.*")
val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
val explodedAndSelectedDf = explodedDf.selectExpr(fieldNamesToSelect: _*)
return flattenDf(explodedAndSelectedDf)
case _ => Unit
}
i += 1
end = i >= fieldsNumber
}
df
}
推荐答案
val df = Seq(("1", (2, (3, 4)),Seq(1,2))).toDF()
val df = Seq(("1", (2, (3, 4)),Seq(1,2))).toDF()
df.printSchema
root
|-- _1: string (nullable = true)
|-- _2: struct (nullable = true)
| |-- _1: integer (nullable = false)
| |-- _2: struct (nullable = true)
| | |-- _1: integer (nullable = false)
| | |-- _2: integer (nullable = false)
|-- _3: array (nullable = true)
| |-- element: integer (containsNull = false)
def flattenSchema(schema: StructType, fieldName: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val cols = if (fieldName == null) f.name else (fieldName + "." + f.name)
f.dataType match {
case structType: StructType => fattenSchema(structType, cols)
case arrayType: ArrayType => Array(explode(col(cols)))
case _ => Array(col(cols))
}
})
}
df.select(flattenSchema(df.schema) :_*).printSchema
root
|-- _1: string (nullable = true)
|-- _1: integer (nullable = true)
|-- _1: integer (nullable = true)
|-- _2: integer (nullable = true)
|-- col: integer (nullable = false)
这篇关于用不同的数据类型在 Scala 中展平数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!