通过ChainMapper可以将多个map类合并成一个map任务。

下面个这个例子没什么实际意思,但是很好的演示了ChainMapper的作用。

源文件
100tom90
101mary85
102kate60

map00的结果,过滤掉100的记录
101mary85
102kate60

map01的结果,过滤掉101的记录
102kate60

reduce结果
102kate60

import java.io.IOException;
import java.util.*;
import java.lang.String; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.lib.*; public class WordCount
{ public static class Map00 extends MapReduceBase implements Mapper
{ public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
{ Text ft = new Text(“100″); if(!key.equals(ft))
{
output.collect(key, value);
}
}
} public static class Map01 extends MapReduceBase implements Mapper
{ public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
{ Text ft = new Text(“101″); if(!key.equals(ft))
{
output.collect(key, value);
}
}
} public static class Reduce extends MapReduceBase implements Reducer
{
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
{ while(values.hasNext())
{
output.collect(key, values.next());
} }
} public static void main(String[] args) throws Exception
{ JobConf conf = new JobConf(WordCount.class);
conf.setJobName(“wordcount00″); conf.setInputFormat(KeyValueTextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class); ChainMapper cm = new ChainMapper(); JobConf mapAConf = new JobConf(false);
cm.addMapper(conf, Map00.class, Text.class, Text.class, Text.class, Text.class, true, mapAConf); JobConf mapBConf = new JobConf(false);
cm.addMapper(conf, Map01.class, Text.class, Text.class, Text.class, Text.class, true, mapBConf); conf.setReducerClass(Reduce.class); conf00.setOutputKeyClass(Text.class);
conf00.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); }
}

总结:

  1.一句话:ChainMapper即在Reduce之前进行多次Mapper

  2.ChainMapper必须保证所有的子mapper输入输出是一致的!

  3.ChainMapper中的子mapper是线性执行的

05-11 14:02