分区主要原理是,自定义分区类继承Partitioner,根据业务需求实现分区函数 public int getPartition(Text key, Text value, int numPartitions),将Key相同的记录,划分到同一reduce函数中。需要注意的是如果在驱动程序中将NumReduceTasks值设置为时, 不会执行分区函数。这个可以理解,毕竟只有1个reduce,所以,没有必要执行Partitioner
具体实现如下:
点击(此处)折叠或打开
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Partitioner;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import java.io.IOException;
- public class PartitionJob {
- public static class PartitionMapper extends Mapper<LongWritable ,Text, Text,Text>{
- private Text tmpKey = new Text();
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- String data = value.toString();
- String []arrays = data.split("\t");
- tmpKey.set(arrays[0]);
- System.out.println("data:"+data);
- context.write(tmpKey, value);
- }
- }
- public static class PartitionReduce extends Reducer<Text, Text, NullWritable, Text>{
- private Text out = new Text();
- @Override
- protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- String str="";
- for (Text val:values){
- context.write(NullWritable.get(), val);
- }
- }
- }
- public static class Partition extends Partitioner<Text ,Text>{
- @Override
- public int getPartition(Text key, Text value, int numPartitions) {
- return Math.abs(key.hashCode())%numPartitions;
- }
- }
- public static void main(String []args){
- try {
- Job job = Job.getInstance();
- job.setJobName("PartitionJob");
- job.setJarByClass(PartitionJob.class);
- job.setMapperClass(PartitionMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- job.setReducerClass(PartitionReduce.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
- job.setPartitionerClass(Partition.class);
- job.setNumReduceTasks(2);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- FileSystem.get(job.getConfiguration()).delete(new Path(args[1]), true);
- System.out.println(job.waitForCompletion(true));
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
- }