问题描述
我有以下架构 -
root
|-- id:string (nullable = false)
|-- age: long (nullable = true)
|-- cars: struct (nullable = true)
| |-- car1: string (nullable = true)
| |-- car2: string (nullable = true)
| |-- car3: string (nullable = true)
|-- name: string (nullable = true)
如何将结构cars"传递给 udaf?如果我只想传递汽车子结构,那么 inputSchema 应该是什么.
How can I pass the struct 'cars' to an udaf? What should be the inputSchema if i just want to pass the cars sub-struct.
推荐答案
可以,但是 UDAF 的逻辑会有所不同.例如,如果您有两行:
You could, but the logic of the UDAF would be different. For example, if you have two rows:
val seq = Seq(cars(cars_schema("car1", "car2", "car3")), (cars(cars_schema("car1", "car2", "car3"))))
val rdd = spark.sparkContext.parallelize(seq)
这里的架构是
root
|-- cars: struct (nullable = true)
| |-- car1: string (nullable = true)
| |-- car2: string (nullable = true)
| |-- car3: string (nullable = true)
然后如果您尝试调用聚合:
then if you try to call the aggregation:
val df = seq.toDF
df.agg(agg0(col("cars")))
您必须更改您的 UDAF 输入架构,例如:
You must change your UDAFs input schema like:
val carsSchema =
StructType(List(StructField("car1", StringType, true), StructField("car2", StringType, true), StructField("car3", StringType, true)))
在你的 UDAF 中,你必须处理这个改变 inputSchema 的模式:
and in the boy of your UDAF you must deal with this schema changing the inputSchema:
override def inputSchema: StructType = StructType(StructField("input", carsSchema) :: Nil)
在您的更新方法中,您必须处理输入行的格式:
In your update method you must deal with the format of your input Rows:
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val i = input.getAs[Array[Array[String]]](0)
// i here would be [car1,car2,car3], an array of strings
buffer(0) = ???
}
从这里开始,您可以转换 i 以更新您的缓冲区并完成合并和评估功能.
An from here, you can transform i to update your buffer and complete the merge and evaluate functions.
这篇关于将结构传递给 spark 中的 UDAF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!