在hadoop 中一个Job中可以按顺序运行多个mapper对数据进行前期的处理,再进行reduce,经reduce后的结果可经个经多个按顺序执行的mapper进行后期的处理,这样的Job是不会保存中间结果的,并大大减少了I/O操作。
例如:在一个Job中,按顺序执行 MAP1->MAP2->REDUCE->MAP3->MAP4 在这种链式结构中,要将MAP2与REDUCE看成这个MAPREDUCE的核心部分(就像是单个中的MAP与REDUCE),并且partitioning与shuffling在此处才会被应用到。所以MAP1作为前期处理,而MAP3与MAP4作为后期处理。
plaincopy
- Configuration conf = getConf();
- JobConf job = new JobConf(conf);
- job.setJobName(“ChainJob”);
- job.setInputFormat(TextInputFormat.class);
- job.setOutputFormat(TextOutputFormat.class);
- FileInputFormat.setInputPaths(job, in);
- FileOutputFormat.setOutputPath(job, out);
- JobConf map1Conf = new JobConf(false);
- ChainMapper.addMapp(job,
- Map1.class,
- LongWritable.class,
- Text.class,
- Text.class,
- Text.class,
- true,
- map1Conf);
- //将map1加入到Job中
- JobConf map2Conf = new JobConf(false);
- ChainMapper.addMapper(job,
- BMap.class,
- Text.class,
- Text.class,
- LongWritable.class,
- Text.class,
- true,
- map2Conf);
- /将map2加入到Job中
- JobConf reduceConf = new JobConf(false);
- ChainReducer.setReducer(job,
- Reduce.class,
- LongWritable.class,
- Text.class,
- Text.class,
- Text.class,
- true,
- reduceConf);
- /将reduce加入到Job中
- JobConf map3Conf = new JobConf(false);
- ChainReducer.addMapper(job,
- Map3.class,
- Text.class,
- Text.class,
- LongWritable.class,
- Text.class,
- true,
- map3Conf);
- /将map3加入到Job中
- JobConf map4Conf = new JobConf(false);
- ChainReducer.addMapper(job,
- Map4.class,
- LongWritable.class,
- Text.class,
- LongWritable.class,
- Text.class,
- true,
- map4Conf);
- //将map4加入到Job中
- JobClient.runJob(job);
- 注:上一个的输出是一下的输入,所以上一个的输出数据类型必须与下一个输入的数据类型一样
***************************************************
addMapper中的参数
public static <K1,V1,K2,V2> void
addMapper(JobConf job,
Class<? extends Mapper<K1,V1,K2,V2>> klass,
Class<? extends K1> inputKeyClass,
Class<? extends V1> inputValueClass,
Class<? extends K2> outputKeyClass,
Class<? extends V2> outputValueClass,
boolean byValue,
JobConf mapperConf)