问题描述
我有两个列的数据帧, ID
类型内部
和 VEC
类型矢量
( org.apache.spark.mllib.linalg.Vector
)
I have a DataFrame of two columns, ID
of type Int
and Vec
of type Vector
(org.apache.spark.mllib.linalg.Vector
).
数据框看起来如下:
ID,Vec
1,[0,0,5]
1,[4,0,1]
1,[1,2,1]
2,[7,5,0]
2,[3,3,4]
3,[0,8,1]
3,[0,0,1]
3,[7,7,7]
....
我愿做一个 GROUPBY($ID)
然后由矢量求和应用在各组内的行的集合。
I would like to do a groupBy($"ID")
then apply an aggregation on the rows inside each group by summing the vectors.
上面的例子中所述的期望的输出将是:
The desired output of the above example would be:
ID,SumOfVectors
1,[5,2,7]
2,[10,8,4]
3,[7,15,9]
...
可用的聚合功能将无法正常工作,例如, df.groupBy($ID)。AGG(SUM($VEC)
将导致ClassCastException异常。
The available aggregation functions will not work, e.g. df.groupBy($"ID").agg(sum($"Vec")
will lead to an ClassCastException.
如何实现自定义的聚合功能,可以让我做载体或数组或任何其他自定义操作的总和?
How to implement a custom aggregation function that allows me to do the sum of vectors or arrays or any other custom operation?
推荐答案
我个人不打扰UDAFs。有超过冗长,不完全是快速。相反,我会简单地使用 reduceByKey
:
Personally I wouldn't bother with UDAFs. There are more than verbose and not exactly fast. Instead I would simply use reduceByKey
:
import org.apache.spark.sql.Row
import breeze.linalg.{DenseVector => BDV}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
val rdd = sc.parallelize(Seq(
(1, "[0,0,5]"), (1, "[4,0,1]"), (1, "[1,2,1]"),
(2, "[7,5,0]"), (2, "[3,3,4]"), (3, "[0,8,1]"),
(3, "[0,0,1]"), (3, "[7,7,7]")))
val df = rdd.map{case (k, v) => (k, Vectors.parse(v))}.toDF("id", "vec")
val aggregated = df
.map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values))}
.reduceByKey(_ + _)
.mapValues(v => Vectors.dense(v.toArray))
.toDF("id", "vec")
aggregated.show
// +---+--------------+
// | id| vec|
// +---+--------------+
// | 1| [5.0,2.0,7.0]|
// | 2|[10.0,8.0,4.0]|
// | 3|[7.0,15.0,9.0]|
// +---+--------------+
和公正的比较简单UDAF:
And just for comparison a "simple" UDAF:
import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
UserDefinedAggregateFunction}
import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType}
import scala.collection.mutable.WrappedArray
class VectorSum (n: Int) extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("v", new VectorUDT())
def bufferSchema = new StructType().add("buff", ArrayType(DoubleType))
def dataType = new VectorUDT()
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, Array.fill(n)(0.0))
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0)) {
val buff = buffer.getAs[WrappedArray[Double]](0)
val v = input.getAs[Vector](0).toSparse
for (i <- v.indices) {
buff(i) += v(i)
}
buffer.update(0, buff)
}
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
val buff1 = buffer1.getAs[WrappedArray[Double]](0)
val buff2 = buffer2.getAs[WrappedArray[Double]](0)
for ((x, i) <- buff2.zipWithIndex) {
buff1(i) += x
}
buffer1.update(0, buff1)
}
def evaluate(buffer: Row) = Vectors.dense(
buffer.getAs[Seq[Double]](0).toArray)
}
和用法的例子:
df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show
// +---+--------------+
// | id| vec|
// +---+--------------+
// | 1| [5.0,2.0,7.0]|
// | 2|[10.0,8.0,4.0]|
// | 3|[7.0,15.0,9.0]|
// +---+--------------+
这篇关于SPARK数据框:自定义的聚合函数和向量的列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!