我正在学习Hadoop。
我正在尝试使用map reduce解决以下问题:

给定一个包含医生和所治疗患者的记录的文件。查找治疗人数最多的前两名医生。

例如,

输入:

医生病人

D4 P1

D2 P2

D3的P3

D4 P4

D1的P5

D4 P1

D2 P2

D3 P1

D2 P5

D2 P6

D2 P1

D2 P5

D4 P2

D2 P1

输出:

D2 7

D4 4

我能够列出所有医生的病历。但是无法弄清楚如何让MR使用前两名医生吗?

package com.doctors;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class TopDoctors {

    public static class Mapper extends
            org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, IntWritable> {

        @Override
        protected void map(
                Text key,
                Text value,
                org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            context.write(key, new IntWritable(1));
        }
    }

    public static class Reducer
            extends
            org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, IntWritable, Text> {
        @Override
        protected void reduce(
                Text key,
                Iterable<IntWritable> value,
                org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, IntWritable, Text>.Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable i : value) {
                sum += i.get();
            }
            context.write(new IntWritable(sum), key);
        }
    }

    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance();

        job.setJarByClass(TopDoctors.class);

        job.setMapperClass(Mapper.class);
        job.setReducerClass(Reducer.class);

        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}

请澄清。

最佳答案

我认为这可以解决您的问题
将您的总和医生编号放在TreeMap中,并写成cleanup method

  public static class Reducer  ......

#In your reduce class create a TreeMap
 TreeMap<Integer, String> TopN = new TreeMap<Integer, String>();


protected void reduce(......){
   #here  is  your  get sum code

 if (TopN.size() < 3) {  //TopN
     if (TopN.get(sum) != null)
     { TopN.put(sum, TopN.get(sum) + "-----" + key.toString());
    } else {
     TopN.put(sum, key.toString());
     }
} else {
 // if map.size> N  add one  & remove one
 if (TopN.get(sum) != null) {
    TopN.put(sum, TopN.get(sum) + "------" + key.toString());
     //
    } else {
     TopN.put(sum, key.toString());
    TopN.remove(TopN.firstKey());
     }
     } // when you put your data in treemap, it will sorted by itself;
     }
 @Override
 protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)throws IOException, InterruptedException {
if (TopN != null && !TopN.isEmpty()) {
     Set<Integer> keys = TopN.keySet();
    for (Integer key : keys) { outkey.set(TopN.get(key));
         outvalue.set(key);
        context.write(outkey, outvalue);
                             } } } }

08-28 22:03