本文介绍了如何将RDD [Row]转换为RDD [Vector]的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试使用Scala实现k-means方法.我创建了一个RDD之类的东西
I'm trying to implement k-means method using scala.I created a RDD something like that
val df = sc.parallelize(data).groupByKey().collect().map((chunk)=> {
sc.parallelize(chunk._2.toSeq).toDF()
})
val examples = df.map(dataframe =>{
dataframe.selectExpr(
"avg(time) as avg_time",
"variance(size) as var_size",
"variance(time) as var_time",
"count(size) as examples"
).rdd
})
val rdd_final=examples.reduce(_ union _)
val kmeans= new KMeans()
val model = kmeans.run(rdd_final)
使用此代码,我得到一个错误
With this code I obtain an error
type mismatch;
[error] found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error] required:org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
所以我试着去做:
val rdd_final_Vector = rdd_final.map{x:Row => x.getAs[org.apache.spark.mllib.linalg.Vector](0)}
val model = kmeans.run(rdd_final_Vector)
但是随后我得到一个错误:
But then I obtain an error:
java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.mllib.linalg.Vector
因此,我正在寻找一种进行该转换的方法,但找不到任何方法.
So I'm looking for a way to do that cast, but I can't find any method.
有什么主意吗?
最诚挚的问候
推荐答案
至少有两个问题:
- 否您真的不能将行强制转换为向量:行是
Spark SQL
可以理解的潜在完全不同类型的集合.Vector
不是本机Spark sql类型 - 您的SQL语句的内容与您尝试使用
KMeans
实现的内容之间似乎不匹配:SQL正在执行聚合.但是KMeans
期望一系列单独的数据点,形式为Vector(封装了Array[Double]
).那么,那么-为什么要为KMeans
操作提供sum
和average
?
- No you really can not cast a Row to a Vector: a Row is a collection of potentially disparate types understood by
Spark SQL
. AVector
is not a native spark sql type - There seems to be a mismatch between the content of your SQL statement and what you are attempting to achieve with
KMeans
: the SQL is performing aggregations. ButKMeans
expects a series of individual data points in the form a Vector (which encapsulates anArray[Double]
) . So then - why are you supplyingsum
's andaverage
's to aKMeans
operation?
此处仅处理#1:您将需要执行以下操作:
Addressing just #1 here: you will need to do something along the lines of:
val doubVals = <rows rdd>.map{ row => row.getDouble("colname") }
val vector = Vectors.toDense{ doubVals.collect}
然后,您将得到一个封装正确的Array[Double]
(在Vector内),可以将其提供给Kmeans
.
Then you have a properly encapsulated Array[Double]
(within a Vector) that can be supplied to Kmeans
.
这篇关于如何将RDD [Row]转换为RDD [Vector]的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!