首先,我是Java的初学者,但我必须尽快完成MapReduce Job。
我试图修改wordcount算法,因为问题非常相似。
我的输入是一个文本文件,其中包含一列数据,如下所示:
Date:2008-10-23Hour:02User:000 1
Date:2008-10-23Hour:02User:000 0
Date:2008-10-23Hour:02User:000 1
Date:2008-10-23Hour:02User:000 1
Date:2008-10-23Hour:02User:000 0
Date:2008-10-23Hour:02User:000 1
Date:2008-10-23Hour:02User:000 1
Date:2008-10-23Hour:02User:000 0
Date:2008-10-23Hour:02User:000 1
Date:2008-10-23Hour:02User:000 1
Date:2008-10-23Hour:02User:000 1
Date:2008-10-23Hour:03User:000 0
Date:2008-10-23Hour:03User:000 1
Date:2008-10-23Hour:03User:000 1
Date:2008-10-23Hour:03User:000 0
Date:2008-10-23Hour:03User:000 1
Date:2008-10-23Hour:03User:000 1
Date:2008-10-23Hour:03User:000 0
Date:2008-10-23Hour:04User:000 1
Date:2008-10-23Hour:04User:000 0
Date:2008-10-23Hour:04User:000 1
Date:2008-10-23Hour:04User:000 1
Date:2008-10-23Hour:04User:000 1
Date:2008-10-23Hour:04User:000 1
Date:2008-10-23Hour:04User:000 0
Date:2008-10-23Hour:04User:000 1
Date:2008-10-23Hour:04User:000 0
Date:2008-10-23Hour:04User:000 1
MapReduce作业必须将每行的第一个字符串设置为我的键(Date:2008-10-23Hour:03User:001),并将数字1或0设置为值。
reducer 的任务只是简单地将同一键的值(1 + 1 + 0 + 1 + 0 ...)相加...就是这样。
问题在于,结果是我获得了大量(太大)的最终值,但我绝对不知道原因。
这是算法:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapReduce {
public static class KeyValueMapper
extends Mapper<Object, Object , Text, IntWritable>{
private IntWritable ValueDistanceFunction = new IntWritable();
private Text DateHourUser = new Text();
public void map(Object key, Object value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
DateHourUser.set(read.nextToken());
ValueDistanceFunction.set(Integer.parseInt(read.nextToken()));
context.write(DateHourUser,ValueDistanceFunction);
// I print the results only to check them
System.out.println(DateHourUser);
System.out.println(ValueDistanceFunction);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable>values,
Context context
) throws IOException, InterruptedException {
int sum =0;
for (IntWritable val : values) {
sum += val.get();
System.out.println(sum);
}
result.set(sum);
context.write(key,result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "KeyValue");
job.setJarByClass(MapReduce.class);
job.setMapperClass(KeyValueMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/home/ubuntu/workspace/FileGeneration/Input"));
FileOutputFormat.setOutputPath(job, new Path("/home/ubuntu/workspace/FileGeneration/Output"));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
而这些正是错误的输出:
日期:2008-10-23时数:02用户数:000 16
日期:2008-10-23时数:03用户:000 6
日期:2008-10-23时数:04用户数:000 14
正确的输出应为:
日期:2008-10-23时数:02用户:000 8
日期:2008-10-23时数:03用户:000 3
日期:2008-10-23时数:04用户:000 7
错误的结果恰好是正确的一个的两倍
此外,如果在计算过程中打印总和和键(值(0或1)),我将得到:
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
0
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
0
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
0
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:03User:000
0
Date:2008-10-23Hour:03User:000
1
Date:2008-10-23Hour:03User:000
1
Date:2008-10-23Hour:03User:000
0
Date:2008-10-23Hour:03User:000
1
Date:2008-10-23Hour:03User:000
1
Date:2008-10-23Hour:03User:000
0
Date:2008-10-23Hour:04User:000
1
Date:2008-10-23Hour:04User:000
0
Date:2008-10-23Hour:04User:000
1
Date:2008-10-23Hour:04User:000
1
Date:2008-10-23Hour:04User:000
1
Date:2008-10-23Hour:04User:000
1
Date:2008-10-23Hour:04User:000
0
Date:2008-10-23Hour:04User:000
1
Date:2008-10-23Hour:04User:000
0
Date:2008-10-23Hour:04User:000
1
8 (this is correct)
3 (this is correct)
7 (this is correct)
Date:2008-10-23Hour:02User:000
0
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
0
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
0
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:02User:000
1
Date:2008-10-23Hour:03User:000
0
Date:2008-10-23Hour:03User:000
1
Date:2008-10-23Hour:03User:000
1
Date:2008-10-23Hour:03User:000
0
Date:2008-10-23Hour:03User:000
1
Date:2008-10-23Hour:03User:000
1
Date:2008-10-23Hour:03User:000
0
Date:2008-10-23Hour:04User:000
1
Date:2008-10-23Hour:04User:000
0
Date:2008-10-23Hour:04User:000
1
Date:2008-10-23Hour:04User:000
1
Date:2008-10-23Hour:04User:000
1
Date:2008-10-23Hour:04User:000
1
Date:2008-10-23Hour:04User:000
0
Date:2008-10-23Hour:04User:000
1
Date:2008-10-23Hour:04User:000
0
Date:2008-10-23Hour:04User:000
1
8
3
7
16 (wrong final value)
6 (wrong final value)
14 (wrong final value)
非常感谢。
最佳答案
问题出在您的Mapper代码上。为什么要在映射器中读取输入?
以下几行有问题:
BufferedReader sc=new BufferedReader(new FileReader("/home/ubuntu/workspace/FileGeneration/Input/Input"));
String line;
while ((line=sc.readLine()) !=null){
StringTokenizer read= new StringTokenizer (line," ");
while (read.hasMoreTokens()){
您已经在Driver类中指定了输入。
FileInputFormat.addInputPath(job, new Path("/home/ubuntu/workspace/FileGeneration/Input"));
无需在映射器中再次读取此输入。框架读取此文件,并将每一行传递给Mapper。该行包含在
value
中。您的Mapper代码应如下所示:
public void map(Object key, Object value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
DateHourUser.set(read.nextToken());
ValueDistanceFunction.set(Integer.parseInt(read.nextToken()));
context.write(DateHourUser,ValueDistanceFunction);
// I print the results only to check them
System.out.println(DateHourUser);
System.out.println(ValueDistanceFunction);
}
}
编辑:
我获取了您的数据并运行了程序。我得到以下结果。我认为数据或代码都没有问题:
E:\hdp\hadoop-2.7.1.2.3.0.0-2557\bin>hadoop fs -cat /user/mballur/Output/part-r-00000
Date:2008-10-23Hour:02User:000 8
Date:2008-10-23Hour:03User:000 4
Date:2008-10-23Hour:04User:000 7
该程序没有问题。