本文介绍了如何定义自定义聚合函数以对Vector的列求和?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两列的DataFrame, ID 类型为 Int Vec 类型为 Vector org.apache.spark.mllib.linalg.Vector )的

DataFrame如下所示:

  ID,Vec 
1,[0,0,5]
1,[4,0,1]
1,[1,2,1]
2,[7,5,0]
2,[3,3,4]
3,[0,8,1]
3,[0,0,1]
3,[7,7,7 ]
....

我想做一个 groupBy($ ID)然后通过对向量求和来对每个组内的行进行聚合。



上面期望的输出例如:

  ID,SumOfVectors 
1,[5,2,7]
2, [10,8,4]
3,[7,15,9]
...

可用的聚合功能将不起作用,例如 df.groupBy($ ID)。agg(sum($ Vec)将导致ClassCastException。



如何实现自定义聚合函数,使我可以对向量或数组求和或进行任何其他自定义操作?

解决方案

火花> = 3.0



您可以将 Summarizer 与<$ c $一起使用c> sum

 导入org.apache.spark.ml。 stat.Summarizer 

df
.groupBy($ id)
.agg(Summarizer.sum($ vec)。alias( vec))

火花< = 3.0



我个人不会打扰UDAF。它不仅冗长而且速度也不快(),而我只是使用 reduceByKey / foldByKey

 导入org.apache.spark。 sql.Row 
import breeze.linalg。{DenseVector => BDV}
import org.apache.spark.ml.linalg。{Vector,Vectors}

def dv(values:Double *):Vector = Vectors.dense(values.toArray)

val df = spark.createDataFrame(Seq(
(1,dv(0,0,5)),(1,dv(4,0,1)),(1,dv (1,2,1)),
(2,dv(7,5,0)),(2,dv(3,3,4)),
(3,dv(0, 8,1)),(3,dv(0,0,1)),(3,dv(7,7,7)))
).toDF( id, vec)

val合计= df
.rdd
.map {case Row(k:Int,v:Vector)=> (k,BDV(v.toDense.values))}
.foldByKey(BDV.zeros [Double](3))(_ + = _)
.mapValues(v => Vectors.dense (v.toArray))
.toDF( id, vec)

汇总。show

// + --- +- ----------- +
// | id | vec |
// + --- + -------------- +
// | 1 | [5.0,2.0,7.0] |
// | 2 | [10.0,8.0,4.0] |
// | 3 | [7.0,15.0,9.0] |
// + --- + -------------- +

为了比较,使用了一个简单的 UDAF。必需的导入:

  import org.apache.spark.sql.expressions。{MutableAggregationBuffer,
UserDefinedAggregateFunction}
导入org.apache.spark.ml.linalg。{向量,向量,SQLDataTypes}
导入org.apache.spark.sql.types。{StructType,ArrayType,DoubleType}
导入org.apache。 spark.sql.Row
导入scala.collection.mutable.WrappedArray

类定义:

  class VectorSum(n:Int)扩展UserDefinedAggregateFunction {
def inputSchema = new StructType()。add( v ,SQLDataTypes.VectorType)
def bufferSchema = new StructType()。add( buff,ArrayType(DoubleType))
def dataType = SQLDataTypes.VectorType
def确定性=真

def initialize(buffer:MutableAggregationBuffer)= {
buffer.update(0,Array.fill(n)(0.0))
}

def更新(缓冲区:MutableAggregationBuffer,输入:行)= {
if(!input.isNullAt( 0)){
val buff = buffer.getAs [WrappedArray [Double]](0)
val v = input.getAs [Vector](0).toSparse
for(i< -v.indices){
buff(i)+ = v(i)
}
buffer.update(0,buff)
}
}

def merge(buffer1:MutableAggregationBuffer,buffer2:Row)= {
val buff1 = buffer1.getAs [WrappedArray [Double]](0)
val buff2 = buffer2.getAs [WrappedArray [ Double]](0)
for((x,i)<-buff2.zipWithIndex){
buff1(i)+ = x
}
buffer1.update(0 ,buff1)
}

def评估(buffer:Row)= Vectors.dense(
buffer.getAs [Seq [Double]](0).toArray)
}

示例用法:

  df.groupBy($$ id))。agg(new VectorSum(3)($$ vec)别名 vec)。show 

// + --- + -------------- +
// | id | vec |
// + --- + -------------- +
// | 1 | [5.0,2.0,7.0] |
// | 2 | [10.0,8.0,4.0] |
// | 3 | [7.0,15.0,9.0] |
// + --- + -------------- +

另请参见:。


I have a DataFrame of two columns, ID of type Int and Vec of type Vector (org.apache.spark.mllib.linalg.Vector).

The DataFrame looks like follow:

ID,Vec
1,[0,0,5]
1,[4,0,1]
1,[1,2,1]
2,[7,5,0]
2,[3,3,4]
3,[0,8,1]
3,[0,0,1]
3,[7,7,7]
....

I would like to do a groupBy($"ID") then apply an aggregation on the rows inside each group by summing the vectors.

The desired output of the above example would be:

ID,SumOfVectors
1,[5,2,7]
2,[10,8,4]
3,[7,15,9]
...

The available aggregation functions will not work, e.g. df.groupBy($"ID").agg(sum($"Vec") will lead to an ClassCastException.

How to implement a custom aggregation function that allows me to do the sum of vectors or arrays or any other custom operation?

解决方案

Spark >= 3.0

You can use Summarizer with sum

import org.apache.spark.ml.stat.Summarizer

df
  .groupBy($"id")
  .agg(Summarizer.sum($"vec").alias("vec"))

Spark <= 3.0

Personally I wouldn't bother with UDAFs. There are more than verbose and not exactly fast (Spark UDAF with ArrayType as bufferSchema performance issues) Instead I would simply use reduceByKey / foldByKey:

import org.apache.spark.sql.Row
import breeze.linalg.{DenseVector => BDV}
import org.apache.spark.ml.linalg.{Vector, Vectors}

def dv(values: Double*): Vector = Vectors.dense(values.toArray)

val df = spark.createDataFrame(Seq(
    (1, dv(0,0,5)), (1, dv(4,0,1)), (1, dv(1,2,1)),
    (2, dv(7,5,0)), (2, dv(3,3,4)),
    (3, dv(0,8,1)), (3, dv(0,0,1)), (3, dv(7,7,7)))
  ).toDF("id", "vec")

val aggregated = df
  .rdd
  .map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values)) }
  .foldByKey(BDV.zeros[Double](3))(_ += _)
  .mapValues(v => Vectors.dense(v.toArray))
  .toDF("id", "vec")

aggregated.show

// +---+--------------+
// | id|           vec|
// +---+--------------+
// |  1| [5.0,2.0,7.0]|
// |  2|[10.0,8.0,4.0]|
// |  3|[7.0,15.0,9.0]|
// +---+--------------+

And just for comparison a "simple" UDAF. Required imports:

import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
  UserDefinedAggregateFunction}
import org.apache.spark.ml.linalg.{Vector, Vectors, SQLDataTypes}
import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray

Class definition:

class VectorSum (n: Int) extends UserDefinedAggregateFunction {
    def inputSchema = new StructType().add("v", SQLDataTypes.VectorType)
    def bufferSchema = new StructType().add("buff", ArrayType(DoubleType))
    def dataType = SQLDataTypes.VectorType
    def deterministic = true

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, Array.fill(n)(0.0))
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) {
        val buff = buffer.getAs[WrappedArray[Double]](0)
        val v = input.getAs[Vector](0).toSparse
        for (i <- v.indices) {
          buff(i) += v(i)
        }
        buffer.update(0, buff)
      }
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      val buff1 = buffer1.getAs[WrappedArray[Double]](0)
      val buff2 = buffer2.getAs[WrappedArray[Double]](0)
      for ((x, i) <- buff2.zipWithIndex) {
        buff1(i) += x
      }
      buffer1.update(0, buff1)
    }

    def evaluate(buffer: Row) =  Vectors.dense(
      buffer.getAs[Seq[Double]](0).toArray)
}

And an example usage:

df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show

// +---+--------------+
// | id|           vec|
// +---+--------------+
// |  1| [5.0,2.0,7.0]|
// |  2|[10.0,8.0,4.0]|
// |  3|[7.0,15.0,9.0]|
// +---+--------------+

See also: How to find mean of grouped Vector columns in Spark SQL?.

这篇关于如何定义自定义聚合函数以对Vector的列求和?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 17:29