在hadoop 中一个Job中可以按顺序运行多个mapper对数据进行前期的处理,再进行reduce,经reduce后的结果可经个经多个按顺序执行的mapper进行后期的处理,这样的Job是不会保存中间结果的,并大大减少了I/O操作。

例如:在一个Job中,按顺序执行 MAP1->MAP2->REDUCE->MAP3->MAP4 在这种链式结构中,要将MAP2与REDUCE看成这个MAPREDUCE的核心部分(就像是单个中的MAP与REDUCE),并且partitioning与shuffling在此处才会被应用到。所以MAP1作为前期处理,而MAP3与MAP4作为后期处理。

  1. Configuration conf = getConf();
  2. JobConf job = new JobConf(conf);
  3. job.setJobName(“ChainJob”);
  4. job.setInputFormat(TextInputFormat.class);
  5. job.setOutputFormat(TextOutputFormat.class);
  6. FileInputFormat.setInputPaths(job, in);
  7. FileOutputFormat.setOutputPath(job, out);
  8. JobConf map1Conf = new JobConf(false);
  9. ChainMapper.addMapp(job,
  10. Map1.class,
  11. LongWritable.class,
  12. Text.class,
  13. Text.class,
  14. Text.class,
  15. true,
  16. map1Conf);
  17. //将map1加入到Job中
  18. JobConf map2Conf = new JobConf(false);
  19. ChainMapper.addMapper(job,
  20. BMap.class,
  21. Text.class,
  22. Text.class,
  23. LongWritable.class,
  24. Text.class,
  25. true,
  26. map2Conf);
  27. /将map2加入到Job中
  28. JobConf reduceConf = new JobConf(false);
  29. ChainReducer.setReducer(job,
  30. Reduce.class,
  31. LongWritable.class,
  32. Text.class,
  33. Text.class,
  34. Text.class,
  35. true,
  36. reduceConf);
  37. /将reduce加入到Job中
  38. JobConf map3Conf = new JobConf(false);
  39. ChainReducer.addMapper(job,
  40. Map3.class,
  41. Text.class,
  42. Text.class,
  43. LongWritable.class,
  44. Text.class,
  45. true,
  46. map3Conf);
  47. /将map3加入到Job中
  48. JobConf map4Conf = new JobConf(false);
  49. ChainReducer.addMapper(job,
  50. Map4.class,
  51. LongWritable.class,
  52. Text.class,
  53. LongWritable.class,
  54. Text.class,
  55. true,
  56. map4Conf);
  57. //将map4加入到Job中
  58. JobClient.runJob(job);
  59. 注:上一个的输出是一下的输入,所以上一个的输出数据类型必须与下一个输入的数据类型一样

***************************************************

addMapper中的参数

public static <K1,V1,K2,V2> void

addMapper(JobConf job,

                    Class<? extends Mapper<K1,V1,K2,V2>> klass,

                    Class<? extends K1> inputKeyClass,

                    Class<? extends V1> inputValueClass,

                    Class<? extends K2> outputKeyClass,

                    Class<? extends V2> outputValueClass,

                    boolean byValue,

                    JobConf mapperConf)

05-23 08:29