使用时,需要修改K值,args值

运行流程:

先初始化中心点->map中和距离最近的中心点生成一对传入reduce->reduce中把相同key值的存到一起->更新中心点,计算和上一次的结果偏差是否达到要求,没有的话继续迭代

因为一般来说都是读取文件操作,所以使用String操作更方便

package kmeans;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class KMeans {
/**************
* agrs[0]:hdfs://localhost:9000/KMeans/cluster
* args[1]:hdfs://localhost:9000/KMeans/center
* args[2]:hdfs://localhost:9000/KMeans/output
* hdfs://localhost:9000/KMeans/center/center为质心点存放文件,每轮都会更新
**********/
public static void main(String[] args) throws Exception {
CenterInitial centerInitial = new CenterInitial();
centerInitial.run(args);
int times = 0;
double s = 0, shold = 0.001,temp = Integer.MAX_VALUE;;
do {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
@SuppressWarnings("deprecation")
Job job = new Job(conf, "KMeans");
job.setJarByClass(KMeans.class); // 设置job所在的类在哪个jar包 job.setMapperClass(KMapper.class); // 指定job所用的map类 reducer类
job.setReducerClass(KReducer.class); job.setMapOutputKeyClass(Text.class); // map阶段的输出的key
job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class);// reduce阶段的输出的key
job.setOutputValueClass(Text.class); FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(args[2]), true);//输出路径下不能存在要输出的文件 否则报错,每次迭代要删除这个文件 FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[2])); if (job.waitForCompletion(true)) {
NewCenter newCenter = new NewCenter();
s = newCenter.run(args);
if(Math.abs(s - temp) < shold)
break;
else
temp = s;
times++;
}
} while (s > shold); // shold为阀值,当迭代后基本上收敛就可以停止了
System.out.println("Iterator: " + times); // 打印迭代次数
}
}

入口类

package kmeans;

//**************************    初始化质心(随机生成k个质心)   *************************

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; public class CenterInitial { public void run(String[] args) throws IOException {
String[] clist;
int k = 5; // ---------随机生成5个质心点-----------
String string = ""; // string 存放k个顶点,形式如a,b c,d e,f ......,中间以空格分开 String inpath = args[0] + "/testData";
String outpath = args[1] + "/center"; Configuration conf = new Configuration(); // 读取hadoop文件系统的配置
// conf1.set("hadoop.job.ugi", "hadoop,hadoop");
FileSystem fs = FileSystem.get(URI.create(inpath), conf); // FileSystem是用户操作HDFS的核心类,它获得URI对应的HDFS文件系统
FSDataInputStream in = null; ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
in = fs.open(new Path(inpath));
IOUtils.copyBytes(in, out, 50, false); // 用Hadoop的IOUtils工具方法来让这个文件的指定字节复制到标准输出流上 clist = out.toString().split(" "); // clist是字符串数组,每一个元素如(a,b)这样的形式
/*
* ByteArrayOutputStream此类实现了一个输出流,其中的数据被写入一个 byte 数组。 缓冲区会随着数据的不断写入而自动增长。可使用
* toByteArray() 和 toString() 获取数据
*/
} finally {
IOUtils.closeStream(in); // out是ByteArrayOutputStream,关闭无效。这里我们手动关闭in
} FileSystem filesystem = FileSystem.get(URI.create(outpath), conf); for (int i = 0; i < k; i++) {
int j = (int) (Math.random() * 100) % clist.length;
if (string.contains(clist[j])) // k个初始 顶点不能重复,若重复此次循环作废(continue),增加一次循环(k++)以保证
{ // 有k个顶点
k++;
continue;
}
string = string + clist[j].replace(" ", "") + " ";
}
/*
* 如果没有重复的,处理每一个点——去掉空格,使其符合输入格式。 for example,( a, b )处理完后把所有空格都去除掉,变成(a,b)
* 7个质心点中间以空格分开,最后string格式为(a,b) (c,d) (e,f)...
*/
OutputStream out2 = filesystem.create(new Path(outpath));
IOUtils.copyBytes(new ByteArrayInputStream(string.getBytes()), out2, 4096, true); // write string
/*
* string写到center文件中,这里out2是个FSDataOutputStream,指向/KMeans/center/center
* 用字节数组流从字节数组中读取string,IOUtils把这个流复制给FSDataOutputStream
*/
System.out.println(string); // 在屏幕显示出随机生成的这k个点
} }

初始化中心点

package kmeans;

/*
* map的工作主要是把测试样例所有的点划分到k个类中,中间键值对的格式
* 为(key:center ,value:r若干属于center的点)
*/
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; public class KMapper extends Mapper<LongWritable, Text, Text, Text> { private String[] center; // 存放k个质点 protected void setup(Context context) throws IOException, InterruptedException
{
String centerlist = "hdfs://localhost:9000/KMeans/center/center"; // center文件
Configuration conf1 = new Configuration();
// conf1.set("hadoop.job.ugi", "hadoop-user,hadoop-user");
FileSystem fs = FileSystem.get(URI.create(centerlist), conf1); FSDataInputStream in = null;
ByteArrayOutputStream out = new ByteArrayOutputStream();
/*
* 下面的工作主要是从hdfs://localhost:9000/KMeans/center/center这个文件
* 中读出k个质点,并且放到center这个字符串数组里边
*/
try {
in = fs.open(new Path(centerlist));
IOUtils.copyBytes(in, out, 100, false);
center = out.toString().split(" ");
} finally {
IOUtils.closeStream(in); // 手动关闭FSDataInputStream
}
} public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
// *map函数每次读取文件的一行,key为当前行在文件的字节偏移位置,value 就是该行的内容
while (itr.hasMoreTokens()) // 一次取一个点,outValue对应一个点,(x,y)
{
String outValue = new String(itr.nextToken());
String[] list = outValue.split(","); // a,b转换成string类型的a和b
float min = Integer.MAX_VALUE;
int pos = 0;
/*
* 下面这段for循环是对于一个确定的点outValue,计算其与k个质心点的欧式距离
* 记录距离最小的质心点,并且更新最短距离为outValue和该质心点的距离
*/
for (int i = 0; i < center.length; i++) {
String[] centerStrings = center[i].split(",");
float distance = 0;
for (int j = 0; j < list.length; j++)
distance += (float) Math.pow((Float.parseFloat(list[j]) - Float.parseFloat(centerStrings[j])), 2);
if (min > distance) {
min = distance;
pos = i;
}
}
context.write(new Text(center[pos]), new Text(outValue));// 中间键值对 -> key:质心点,value:当前处理完的点,相同的质心点会送到同一个reducer
}
}
}

重写map类

package kmeans;

/*
* reduce的主要工作是求出每个类别新的质点
*/
import java.io.IOException; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class KReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
StringBuffer outVal = new StringBuffer("");
int count = 0;
String center = ""; // 用于存放用平均值更新后的质点
int length = key.toString().split(",").length;
float[] ave = new float[length];
for (int i = 0; i < length; i++)
ave[i] = 0; for (Text val : value) {
outVal.append(val.toString() + " "); // outVal存放了属于key质心点这个类的所有点
String[] tmp = val.toString().split(",");
for (int i = 0; i < tmp.length; i++)
ave[i] += Float.parseFloat(tmp[i]); // 所有点每个维度都各自相加,然后用于求平均值,更新质点
count++; // 统计属于key质点类的所有点个数
} for (int i = 0; i < length; i++) {
ave[i] = ave[i] / count; // 对于每个维度,求其平均值
if (i == 0)
center += ave[i] + ",";
else {
if (i == length - 1) // 如果是最后一个维度
center += ave[i];
else { // 如果是介于第一个维度和最后一个维度
center += ave[i] + ",";
}
}
}
context.write(new Text(center), new Text(outVal.toString()));
/*
* reduce函数输出键值对格式为 key:初始质点,value:属于初始质点类的所有点+(a,b)
* 其中,(a,b)是更新后的质点;属于初始质点类的所有点之间都有空格分开
*/
} }

重写reduce类

package kmeans;

//***************************更新质心(写入到center文件中),输出两次质心距离变化的大小***********************

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; public class NewCenter { int k = 5;
// -------------------------质心个数,要与CenterInitial的k相对应---------------------- float shold = Integer.MIN_VALUE;
/*
* shold先初始化为最小值,下面对于reduce传来的每行数据,都会计算这行数据的初始质心和更新后的质心之间的欧氏距离 这个距离存储
* 在局部变量temp里,shold会一直更新,当遇到temp比它大时,将temp的值赋值给shold 所以,shold中存放的是每个类更新后变化的最大值
*/
String[] line;
String newcenter = new String(""); // 类似CenterInitial中的string,存放k个新顶点,形式如(a,b) (c,d) (e,f) .....,中间以空格分开 public float run(String[] args) throws IOException, InterruptedException {
Configuration conf = new Configuration();
// conf.set("hadoop.job.ugi", "hadoop,hadoop");
FileSystem fs = FileSystem.get(URI.create(args[2] + "/part-r-00000"), conf);
FSDataInputStream in = null;
ByteArrayOutputStream out = new ByteArrayOutputStream();
try { in = fs.open(new Path(args[2] + "/part-r-00000"));
IOUtils.copyBytes(in, out, 50, false);
line = out.toString().split("\n");// 一行是一个reduce的输出,用"\n"分隔后,一个line的成员代表一个reduce的输出
} finally {
IOUtils.closeStream(in);
}
for (int i = 0; i < k; i++) // line按道理应该是有k个元素
{
/*
* 要注意,reduce函数输出的key,value之间是以/t分隔的,并不是空格! 所以要先替换掉 \t再以空格分隔
*/
String[] l = line[i].split("\t");
String[] clust = l[1].split(" "); // ----clust存储的是质点------------
float tmp = 0;
for (int j = 0; j < clust.length; j++) {
String center[] = l[0].split(",");
String point[] = clust[j].split(",");
float dis = 0;
for(int k=0; k<center.length; k++)
{
dis += Math.pow(Float.parseFloat(center[k]) - Float.parseFloat(point[k]), 2);
}
tmp += dis;
}
newcenter = newcenter + l[0] + " ";
if (shold <= tmp)
shold = tmp;
}
OutputStream out2 = fs.create(new Path(args[1] + "/center"));
/*********
* args[1]:hdfs://localhost:9000/KMeans/center,每轮会更新
************/
IOUtils.copyBytes(new ByteArrayInputStream(newcenter.getBytes()), out2, 4096, true);
System.out.println("newcenter" +newcenter);
return shold;
}
}

更新中点

随机生成5个二维点的簇

package hello;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random; public class NewData { public static void main(String[] args) {
Random rd = new Random();
try {
FileOutputStream out = new FileOutputStream(new File("testData"));
BufferedOutputStream Buff = new BufferedOutputStream(out);
for (int i = 1; i <= 5; i++) {
for (int j = 1; j <= 100; j++) {
int x = rd.nextInt(500) + 500 * i;
int y = rd.nextInt(500) + 500 * i;
String str = "";
str = x + "," + y + " ";
Buff.write(str.getBytes());
}
Buff.write("\n".getBytes());
}
Buff.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("ok~");
}
}

自动生成测试数据

04-25 08:33