如何计算DataFrame中的移动中位数

如何计算DataFrame中的移动中位数

本文介绍了如何计算DataFrame中的移动中位数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法计算 spark DataFrame 中属性的移动中值?

Is there a way to calculate moving median for an attribute in spark DataFrame?

我希望可以使用窗口函数计算移动中位数(通过使用 rowsBetween(0,10) 定义窗口),但没有计算它的功能(类似于 averagemean).

I was hoping that it is possible to calculate moving median using a window function (by defining a window using rowsBetween(0,10)), but there no functionality to calculate it (similar to average or mean).

推荐答案

这是我扩展 UserDefinedAggregateFunction 以获得移动中位数的类.

Here is the class I extended UserDefinedAggregateFunction to get moving median.

class MyMedian extends org.apache.spark.sql.expressions.UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    org.apache.spark.sql.types.StructType(org.apache.spark.sql.types.StructField("value", org.apache.spark.sql.types.DoubleType) :: Nil)

  def bufferSchema: org.apache.spark.sql.types.StructType = org.apache.spark.sql.types.StructType(
    org.apache.spark.sql.types.StructField("window_list", org.apache.spark.sql.types.ArrayType(org.apache.spark.sql.types.DoubleType, false)) :: Nil
  )
  def dataType: org.apache.spark.sql.types.DataType = org.apache.spark.sql.types.DoubleType
  def deterministic: Boolean = true
  def initialize(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer): Unit = {
    buffer(0) = new scala.collection.mutable.ArrayBuffer[Double]()
  }
  def update(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer,input: org.apache.spark.sql.Row): Unit = {
    var bufferVal=buffer.getAs[scala.collection.mutable.WrappedArray[Double]](0).toBuffer
    bufferVal+=input.getAs[Double](0)
    buffer(0) = bufferVal
  }
  def merge(buffer1: org.apache.spark.sql.expressions.MutableAggregationBuffer, buffer2: org.apache.spark.sql.Row): Unit = {
    buffer1(0) = buffer1.getAs[scala.collection.mutable.ArrayBuffer[Double]](0) ++ buffer2.getAs[scala.collection.mutable.ArrayBuffer[Double]](0)
  }
  def evaluate(buffer: org.apache.spark.sql.Row): Any = {
      var sortedWindow=buffer.getAs[scala.collection.mutable.WrappedArray[Double]](0).sorted.toBuffer
      var windowSize=sortedWindow.size
      if(windowSize%2==0){
          var index=windowSize/2
          (sortedWindow(index) + sortedWindow(index-1))/2
      }else{
          var index=(windowSize+1)/2 - 1
          sortedWindow(index)
      }
  }
}

使用上面的 UDAF 示例:

using above UDAF example:

// Create an instance of UDAF MyMedian.
val mm = new MyMedian

var movingMedianDS = dataSet.withColumn("MovingMedian", mm(col("value")).over( Window.partitionBy("GroupId").rowsBetween(-10,10)) )

这篇关于如何计算DataFrame中的移动中位数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-15 18:21