1. KMeans 是一个迭代求解的聚类算法。
  2. 其属于划分(Partitioning)型的聚类方法,即首先创建K个划分,然后迭代地将样本从一个划分转移到另一个划分来改善最终聚类的质量。

ML包下的KMeans方法位于org.apache.spark.ml.clustering包下,其过程大致如下:

  1. 根据给定的k值,选取k个样本点作为初始划分中心
  2. 计算所有样本点到每一个划分中心的距离,并将所有样本点划分到距离最近的划分中心
  3. 计算每个划分中样本点的平均值,将其作为新的中心;循环进行2~3步直至达到最大迭代次数,或划分中心的变化小于某一预定义阈值

数据集:使用UCI数据集中的鸢尾花数据Iris进行实验,它可以在iris获取,Iris数据的样本容量为150,有四个实数值的特征,分别代表花朵四个部位的尺寸,以及该样本对应鸢尾花的亚种类型(共有3种亚种类型)

5.1,3.5,1.4,0.2,setosa
...
5.4,3.0,4.5,1.5,versicolor
...
7.1,3.0,5.9,2.1,virginica
...

在使用前,引入需要的包:  

import org.apache.spark.ml.clustering.{KMeans,KMeansModel}
import org.apache.spark.ml.linalg.Vectors

开启RDD的隐式转换:  

import spark.implicits._

为了便于生成相应的DataFrame,这里定义一个名为model_instance的case class作为DataFrame每一行(一个数据样本)的数据类型  

scala> case class model_instance (features: Vector)
defined class model_instance

在定义数据类型完成后,即可将数据读入RDD[model_instance]的结构中,并通过RDD的隐式转换.toDF()方法完成RDD到DataFrame的转换:

scala> val rawData = sc.textFile("file:///usr/local/spark/iris.txt")
rawData: org.apache.spark.rdd.RDD[String] = iris.csv MapPartitionsRDD[48] at textFile at <console>:33
 
scala> val df = rawData.map(line =>
| { model_instance( Vectors.dense(line.split(",").filter(p => p.matches("\\d*(\\.?)\\d*"))
| .map(_.toDouble)) )}).toDF()
df: org.apache.spark.sql.DataFrame = [features: vector]

在得到数据后,我们即可通过ML包的固有流程:创建Estimator并调用其fit()方法来生成相应的Transformer对象,很显然,在这里KMeans类是Estimator,而用于保存训练后模型的KMeansModel类则属于Transformer

12-16 23:51