本文介绍了如何计算DataFrame中的移动中位数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
有没有办法计算 spark DataFrame 中属性的移动中值?
Is there a way to calculate moving median for an attribute in spark DataFrame?
我希望可以使用窗口函数计算移动中位数(通过使用 rowsBetween(0,10)
定义窗口),但没有计算它的功能(类似于 average
或 mean
).
I was hoping that it is possible to calculate moving median using a window function (by defining a window using rowsBetween(0,10)
), but there no functionality to calculate it (similar to average
or mean
).
推荐答案
这是我扩展 UserDefinedAggregateFunction 以获得移动中位数的类.
Here is the class I extended UserDefinedAggregateFunction to get moving median.
class MyMedian extends org.apache.spark.sql.expressions.UserDefinedAggregateFunction {
def inputSchema: org.apache.spark.sql.types.StructType =
org.apache.spark.sql.types.StructType(org.apache.spark.sql.types.StructField("value", org.apache.spark.sql.types.DoubleType) :: Nil)
def bufferSchema: org.apache.spark.sql.types.StructType = org.apache.spark.sql.types.StructType(
org.apache.spark.sql.types.StructField("window_list", org.apache.spark.sql.types.ArrayType(org.apache.spark.sql.types.DoubleType, false)) :: Nil
)
def dataType: org.apache.spark.sql.types.DataType = org.apache.spark.sql.types.DoubleType
def deterministic: Boolean = true
def initialize(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer): Unit = {
buffer(0) = new scala.collection.mutable.ArrayBuffer[Double]()
}
def update(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer,input: org.apache.spark.sql.Row): Unit = {
var bufferVal=buffer.getAs[scala.collection.mutable.WrappedArray[Double]](0).toBuffer
bufferVal+=input.getAs[Double](0)
buffer(0) = bufferVal
}
def merge(buffer1: org.apache.spark.sql.expressions.MutableAggregationBuffer, buffer2: org.apache.spark.sql.Row): Unit = {
buffer1(0) = buffer1.getAs[scala.collection.mutable.ArrayBuffer[Double]](0) ++ buffer2.getAs[scala.collection.mutable.ArrayBuffer[Double]](0)
}
def evaluate(buffer: org.apache.spark.sql.Row): Any = {
var sortedWindow=buffer.getAs[scala.collection.mutable.WrappedArray[Double]](0).sorted.toBuffer
var windowSize=sortedWindow.size
if(windowSize%2==0){
var index=windowSize/2
(sortedWindow(index) + sortedWindow(index-1))/2
}else{
var index=(windowSize+1)/2 - 1
sortedWindow(index)
}
}
}
使用上面的 UDAF 示例:
using above UDAF example:
// Create an instance of UDAF MyMedian.
val mm = new MyMedian
var movingMedianDS = dataSet.withColumn("MovingMedian", mm(col("value")).over( Window.partitionBy("GroupId").rowsBetween(-10,10)) )
这篇关于如何计算DataFrame中的移动中位数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!