问题描述
我想编写Spark UDAF,其中列的类型可以是在其上定义了Scala数字的任何列。我在Internet上进行了搜索,但仅找到具体类型为 DoubleType
, LongType
的示例。这不可能吗?但是,然后如何将UDAF与其他数值一起使用?
I want to write Spark UDAF where type of the column could be any that has a Scala Numeric defined on it. I've searched over Internet but found only examples with concrete types like DoubleType
, LongType
. Isn't this possible? But how then use that UDAFs with other numeric values?
推荐答案
为简单起见,我们假设您要定义自定义 sum
。您将为输入类型提供一个 TypeTag
并使用Scala反射定义模式:
For simplicity let's assume you want to define a custom sum
. You'll have provide a TypeTag
for the input type and use Scala reflection to define schemas:
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import scala.reflect.runtime.universe._
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
case class MySum [T : TypeTag](implicit n: Numeric[T])
extends UserDefinedAggregateFunction {
val dt = schemaFor[T].dataType
def inputSchema = new StructType().add("x", dt)
def bufferSchema = new StructType().add("x", dt)
def dataType = dt
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, n.zero)
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, n.plus(buffer.getAs[T](0), input.getAs[T](0)))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, n.plus(buffer1.getAs[T](0), buffer2.getAs[T](0)))
}
def evaluate(buffer: Row) = buffer.getAs[T](0)
}
使用上面定义的函数,我们可以创建处理特定类型的实例:
With a function defined as above we can create instance handling specific types:
val sumOfLong = MySum[Long]
spark.range(10).select(sumOfLong($"id")).show
+---------+
|mysum(id)|
+---------+
| 45|
+---------+
注意:
要获得与内置聚合函数相同的灵活性,您必须定义自己的 AggregateFunction
,例如或。可能,但这是一个内部API。
To get the same flexibility as the built-in aggregate functions you'd have to define your own AggregateFunction
, like ImperativeAggregate
or DeclarativeAggregate
. It is possible, but it is an internal API.
这篇关于Spark UDAF-使用泛型作为输入类型?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!