我正在学习Hadoop。
我正在尝试使用map reduce解决以下问题:
给定一个包含医生和所治疗患者的记录的文件。查找治疗人数最多的前两名医生。
例如,
输入:
医生病人
D4 P1
D2 P2
D3的P3
D4 P4
D1的P5
D4 P1
D2 P2
D3 P1
D2 P5
D2 P6
D2 P1
D2 P5
D4 P2
D2 P1
输出:
D2 7
D4 4
我能够列出所有医生的病历。但是无法弄清楚如何让MR使用前两名医生吗?
package com.doctors;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class TopDoctors {
public static class Mapper extends
org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, IntWritable> {
@Override
protected void map(
Text key,
Text value,
org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
context.write(key, new IntWritable(1));
}
}
public static class Reducer
extends
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, IntWritable, Text> {
@Override
protected void reduce(
Text key,
Iterable<IntWritable> value,
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : value) {
sum += i.get();
}
context.write(new IntWritable(sum), key);
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJarByClass(TopDoctors.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
请澄清。
最佳答案
我认为这可以解决您的问题
将您的总和医生编号放在TreeMap中,并写成cleanup method
public static class Reducer ......
#In your reduce class create a TreeMap
TreeMap<Integer, String> TopN = new TreeMap<Integer, String>();
protected void reduce(......){
#here is your get sum code
if (TopN.size() < 3) { //TopN
if (TopN.get(sum) != null)
{ TopN.put(sum, TopN.get(sum) + "-----" + key.toString());
} else {
TopN.put(sum, key.toString());
}
} else {
// if map.size> N add one & remove one
if (TopN.get(sum) != null) {
TopN.put(sum, TopN.get(sum) + "------" + key.toString());
//
} else {
TopN.put(sum, key.toString());
TopN.remove(TopN.firstKey());
}
} // when you put your data in treemap, it will sorted by itself;
}
@Override
protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)throws IOException, InterruptedException {
if (TopN != null && !TopN.isEmpty()) {
Set<Integer> keys = TopN.keySet();
for (Integer key : keys) { outkey.set(TopN.get(key));
outvalue.set(key);
context.write(outkey, outvalue);
} } } }