我知道使用.withColumn()
和UDF
向Spark DataSet添加新列的方法,该方法返回DataFrame。我也知道,我们可以将结果DataFrame转换为DataSet。
我的问题是:
如果我们仍然遵循传统的DF方法(即,将列名作为UDF输入的字符串传递),那么DataSet的类型安全如何在这里发挥作用
是否有一种像我们以前使用RDD一样访问列的“面向对象方式”(不将列名作为字符串传递),用于追加新列。
如何在常规操作(如地图,过滤器等)中访问新列?
例如:
scala> case class Temp(a : Int, b : String) //creating case class
scala> val df = Seq((1,"1str"),(2,"2str),(3,"3str")).toDS // creating DS
scala> val appendUDF = udf( (b : String) => b + "ing") // sample UDF
scala> df.withColumn("c",df("b")) // adding a new column
res5: org.apache.spark.sql.DataFrame = [a: int, b: string ... 1 more field]
scala> res5.as[Temp] // converting to DS
res6: org.apache.spark.sql.Dataset[Temp] = [a: int, b: string ... 1 more field]
scala> res6.map( x =>x.
// list of autosuggestion :
a canEqual equals productArity productIterator toString
b copy hashCode productElement productPrefix
我无法访问使用
c
添加的新列.withColumn()
,因为列c
不在案例类Temp
(仅包含a
和b
)的情况下,使用res5.as[Temp]
转换为DS。如何访问列
c
? 最佳答案
在Dataset
的类型安全的世界中,您会将一个结构映射到另一个结构中。
也就是说,对于每个转换,我们都需要数据的模式表示(如RDD所需要的)。要访问上面的“ c”,我们需要创建一个提供对它的访问的新模式。
case class A(a:String)
case class BC(b:String, c:String)
val f:A => BC = a=> BC(a.a,"c") // Transforms an A into a BC
val data = (1 to 10).map(i => A(i.toString))
val dsa = spark.createDataset(data)
// dsa: org.apache.spark.sql.Dataset[A] = [a: string]
val dsb = dsa.map(f)
//dsb: org.apache.spark.sql.Dataset[BC] = [b: string, c: string]