本文介绍了Reducer使用Apache Hadoop Map Reduce在三个变量的K-Means聚类中没有给出正确的输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
reduce方法中值迭代器的输出显示所有点为零。在reduce方法中是否有任何缺陷?
The output from the values iterator in the reduce method shows all point as zeros.Is there any flaw in the reduce method?
import java.io.IOException;
import java.util.*;
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
@SuppressWarnings("deprecation")
public class KMEANS {
public static String OUT = "OUT";
public static String IN = "IN";
public static String CENTROID_FILE_NAME = "/centroid.txt";
public static String OUTPUT_FILE_NAME = "/part-00000";
public static String DATA_FILE_NAME = "/data.txt";
public static String JOB_NAME = "KMeans";
public static String SPLITTER = "\t| ";
public static List<Point> mCenters = new ArrayList<Point>();
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, DoubleWritable, Point> {
@Override
public void configure(JobConf job) {
// System.out.println("Second");
try {
// Fetch the file from Distributed Cache Read it and store the
// centroid in the ArrayList
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(job);
if (cacheFiles != null && cacheFiles.length > 0) {
String line;
mCenters.clear();
BufferedReader cacheReader = new BufferedReader(
new FileReader(cacheFiles[0].toString()));
try {
while ((line = cacheReader.readLine()) != null) {
String[] temp = line.split(SPLITTER);
String[] temp2=temp[0].split(",");
mCenters.add(new Point(
Double.parseDouble(temp2[0]), Double
.parseDouble(temp2[1]), Double
.parseDouble(temp2[2])));
// System.out.println(mCenters.get(0).toString());
}
} finally {
cacheReader.close();
}
}
} catch (IOException e) {
System.err.println("Exception reading DistribtuedCache: " + e);
}
}
public void map(LongWritable key, Text value,
OutputCollector<DoubleWritable, Point> output,
Reporter reporter) throws IOException {
String line = value.toString();
String temp[] = line.split(",");
Point point = new Point(Double.parseDouble(temp[0]),
Double.parseDouble(temp[1]), Double.parseDouble(temp[2]));
// System.out.println(point.toString());
double min1, min2 = Double.MAX_VALUE;
Point nearest_center = mCenters
.get(0);
// Find the minimum center from a point
for (Point c : mCenters) {
min1 = c.z - point.z;
if (Math.abs(min1) < Math.abs(min2)) {
nearest_center = c;
min2 = min1;
}
}
// Emit the nearest center and the point
output.collect(new DoubleWritable(nearest_center.z), new Point(
point));
}
}
public static class Reduce extends MapReduceBase implements
Reducer<DoubleWritable, Point, Point, Text> {
public void reduce(DoubleWritable key, Iterator<Point> values,
OutputCollector<Point, Text> output, Reporter reporter)
throws IOException {
Point newCenter = new Point(0, 0, 0);
Point sum = new Point(0, 0, 0);
int no_elements = 0;
String points = "";
while (values.hasNext()) {
Point d = values.next().get();
points = points + " " + d.toString();
sum.z = sum.z + d.z;
sum.x = sum.x + d.x;
sum.y = sum.y + d.y;
++no_elements;
}
// Find new center
newCenter.z = sum.z / no_elements;
newCenter.x = sum.x / no_elements;
newCenter.y = sum.y / no_elements;
// Emit new center and point
output.collect(new Point(newCenter), new Text(points));
}
}
public static void main(String[] args) throws Exception {
run();
}
public static void run() throws Exception {
IN = "IN";
OUT = "OUT";
String input = IN;
String output = OUT;
String again_input = output;
boolean isdone = false;
int iteration = 0;
// Reiterating till the convergence
while (isdone == false) {
JobConf conf = new JobConf(KMEANS.class);
if (iteration == 0) {
Path hdfsPath = new Path(input + CENTROID_FILE_NAME);
// upload the file to hdfs. Overwrite any existing copy.
DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
} else {
Path hdfsPath = new Path(again_input + OUTPUT_FILE_NAME);
// upload the file to hdfs. Overwrite any existing copy.
DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
}
conf.setJobName(JOB_NAME);
conf.setMapOutputKeyClass(DoubleWritable.class);
conf.setMapOutputValueClass(Point.class);
conf.setOutputKeyClass(Point.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf,
new Path(input + DATA_FILE_NAME));
FileOutputFormat.setOutputPath(conf, new Path(output));
JobClient.runJob(conf);
// System.out.println("First");
Path ofile = new Path(output + OUTPUT_FILE_NAME);
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br = new BufferedReader(new InputStreamReader(
fs.open(ofile)));
List<Point> centers_next = new ArrayList<Point>();
String line = br.readLine();
while (line != null) {
String[] sp = line.split("\t| ");
String[] temp = sp[0].split(",");
Point c = new Point(Double.parseDouble(temp[0]),
Double.parseDouble(temp[1]),
Double.parseDouble(temp[2]));
centers_next.add(c);
line = br.readLine();
}
br.close();
String prev;
if (iteration == 0)
prev = input + CENTROID_FILE_NAME;
else
prev = again_input + OUTPUT_FILE_NAME;
Path prevfile = new Path(prev);
FileSystem fs1 = FileSystem.get(new Configuration());
BufferedReader br1 = new BufferedReader(new InputStreamReader(
fs1.open(prevfile)));
List<Point> centers_prev = new ArrayList<Point>();
String l = br1.readLine();
while (l != null) {
String[] sp1 = l.split(SPLITTER);
String temp[] = sp1[0].split(",");
Point d = new Point(Double.parseDouble(temp[0]),
Double.parseDouble(temp[1]),
Double.parseDouble(temp[2]));
centers_prev.add(d);
l = br1.readLine();
}
br1.close();
// Sort the old centroid and new centroid and check for convergence
// condition
List<Double> prev_z = new ArrayList<Double>();
List<Double> next_z = new ArrayList<Double>();
for (int i = 0; i < centers_next.size(); i++) {
prev_z.add(centers_prev.get(i).z);
next_z.add(centers_next.get(i).z);
}
Collections.sort(prev_z);
Collections.sort(next_z);
Iterator<Point> it = centers_prev.iterator();
for (Point d : centers_next) {
double temp = it.next().z;
if (Math.abs(temp - d.z) <= 0.1) {
isdone = true;
} else {
isdone = false;
break;
}
}
++iteration;
again_input = output;
output = OUT + System.nanoTime();
// System.out.println("Third");
// isdone = true;
}
}
}
//这是代表a的点类的代码3D点。
//This is the code for point class that represents a 3D point.
public class Point {
public double x;
public double y;
public double z;
public Point(double x, double y, double z) {
this.x = x;
this.y = y;
this.z = z;
}
public Point(Point a) {
this.x = a.x;
this.y = a.y;
this.z = a.z;
}
Point get() {
return this;
}
public String toString() {
return this.x + "," + this.y + "," + this.z;
}
}
推荐答案
这篇关于Reducer使用Apache Hadoop Map Reduce在三个变量的K-Means聚类中没有给出正确的输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!