本文介绍了Spark SQL嵌套withColumn的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个DataFrame,它有多个列,其中一些是结构.像这样

I have a DataFrame that has multiple columns of which some of them are structs. Something like this

root
 |-- foo: struct (nullable = true)
 |    |-- bar: string (nullable = true)
 |    |-- baz: string (nullable = true)
 |-- abc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- def: struct (nullable = true)
 |    |    |    |-- a: string (nullable = true)
 |    |    |    |-- b: integer (nullable = true)
 |    |    |    |-- c: string (nullable = true)

我想在列baz上应用UserDefinedFunction来用功能baz替换baz,但是我不知道该怎么做.这是所需输出的示例(请注意,baz现在是int)

I want to apply a UserDefinedFunction on the column baz to replace baz with a function of baz, but I cannot figure out how to do that. Here is an example of the desired output (note that baz is now an int)

root
 |-- foo: struct (nullable = true)
 |    |-- bar: string (nullable = true)
 |    |-- baz: int (nullable = true)
 |-- abc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- def: struct (nullable = true)
 |    |    |    |-- a: string (nullable = true)
 |    |    |    |-- b: integer (nullable = true)
 |    |    |    |-- c: string (nullable = true)

看起来DataFrame.withColumn仅适用于顶级列,而不适用于嵌套列.我正在使用Scala来解决此问题.

It looks like DataFrame.withColumn only works on top level columns but not on nested columns. I'm using Scala for this problem.

有人可以帮我吗?

谢谢

推荐答案

这很简单,只需使用一个点来选择嵌套结构,例如$"foo.baz":

that's easy, just use a dot to select nested structures, e.g. $"foo.baz" :

case class Foo(bar:String,baz:String)
case class Record(foo:Foo)

val df = Seq(
   Record(Foo("Hi","There"))
).toDF()


df.printSchema

root
 |-- foo: struct (nullable = true)
 |    |-- bar: string (nullable = true)
 |    |-- baz: string (nullable = true)


val myUDF = udf((s:String) => {
 // do something with s 
  s.toUpperCase
})


df
.withColumn("udfResult",myUDF($"foo.baz"))
.show

+----------+---------+
|       foo|udfResult|
+----------+---------+
|[Hi,There]|    THERE|
+----------+---------+

如果要将UDF的结果添加到现有结构foo中,即获取:

If you want to add the result of your UDF to the existing struct foo, i.e. to get:

root
 |-- foo: struct (nullable = false)
 |    |-- bar: string (nullable = true)
 |    |-- baz: string (nullable = true)
 |    |-- udfResult: string (nullable = true)

有两种选择:

withColumn:

df
.withColumn("udfResult",myUDF($"foo.baz"))
.withColumn("foo",struct($"foo.*",$"udfResult"))
.drop($"udfResult")

select:

df
.select(struct($"foo.*",myUDF($"foo.baz").as("udfResult")).as("foo"))

用UDF的结果替换结构中的现有属性:不幸的是,这有效:

Replacing the existing attribute in the struct with the result from the UDF:unfortunately, this does not work:

df
.withColumn("foo.baz",myUDF($"foo.baz")) 

但是可以这样做:

// get all columns except foo.baz
val structCols = df.select($"foo.*")
    .columns
    .filter(_!="baz")
    .map(name => col("foo."+name))

df.withColumn(
    "foo",
    struct((structCols:+myUDF($"foo.baz").as("baz")):_*)
)

这篇关于Spark SQL嵌套withColumn的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-22 08:06