import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
*数据来源为:iris数据集
*数据集链接地址分享给大家,里面有很多常见的机器学习数据源
*http://archive.ics.uci.edu/ml/datasets/Iris
*/
object knn {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("ML")
val sc=new SparkContext(conf)
val fileName = "C:/Users/Desktop/MLData/knntest.txt"
val testdata=sc.textFile(fileName).map { line =>
val strArr=line.split(',')
Array(strArr(0),strArr(1),strArr(2),strArr(3))
}.collect()
val fileName2 = "C:/Users/Desktop/MLData/knntrain.txt"
val traindata=sc.textFile(fileName2).map { line =>
val strArr=line.split(',')
Array(strArr(4),strArr(0),strArr(1),strArr(2),strArr(3))
}.collect()
val knumber=3
val res=ArrayBuffer[String]()
for(tr<-testdata){
res.append(k(tr, traindata, knumber, sc))
}
res.foreach { x => println(x) }
}
/**
* trainData传入数据格式:label,5.1,3.5,1.4,0.2
* testData传入数据格式: 5.1,3.5,1.4,0.2
*/
def k(testData:Array[String],trainData:Array[Array[String]],knumber:Int,sc:SparkContext):String={
//遍历testData,计算每个点与trainData的距离,升序排序,取前k个的label作为改testData最终的label
val tdArr=ArrayBuffer[Double]()
for(i<-0 until testData.length) tdArr.append(testData(i).toDouble)
val distanceArr=ArrayBuffer[LabeledPoint]()
//计算每个点与trainData的距离,升序排序
for(nd<-trainData){
val ndArr=ArrayBuffer[Double]()
for(j<-1 until nd.length) ndArr.append(nd(j).toDouble)
distanceArr.append(LabeledPoint(nd(0).toDouble,Vectors.dense(caldis(tdArr.toArray, ndArr.toArray))))
}
val newdistanceArr=distanceArr.sortWith{case(x1,x2)=>(x1.features(0) < x2.features(0))}
val resultArr=newdistanceArr.take(knumber)
val resultlabel=ArrayBuffer[String]()
for(res<-resultArr) resultlabel.append(res.label.toString())
//得到最终的label排序,取第一个为最终的testData label即可
val rs=sc.parallelize(resultlabel).map { x =>(x,1)}.reduceByKey(_+_).map{case(k,v)=>(v,k)}.sortByKey().collect()(0)._2
rs
}
/**
* @param x1 点1的n维坐标
* @param x2 点2的n维坐标
* @return 点间的欧式距离
*/
def caldis(x1:Array[Double],x2:Array[Double]):Double={
var dis=0.0
for(i<-0 until x1.length) dis +=(x1(i)-x2(i))*(x1(i)-x2(i))
val odis=Math.sqrt(dis)
odis
}
}