import java.io.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.mapreduce.lib.partition.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.util.*;

/**
 * Demonstrates how to use Total Order Partitioner on Word Count.
 */
public class TotalOrderPartitionerExample {
public static class WordCount extends Configured implements Tool {
private final static int REDUCE_TASKS = 8;

    public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new WordCount(), args);
      System.exit(exitCode);
    }

    @Override @SuppressWarnings({ "unchecked", "rawtypes" })
    public int run(String[] args) throws Exception {
      // Check arguments.
      if (args.length != 2) {
        String usage =
          "Usage: " +
          "hadoop jar TotalOrderPartitionerExample$WordCount " +
          "<input dir> <output dir>\n"
        System.out.printf(usage);
        System.exit(-1);
      }

      String jobName = "WordCount";
      String mapJobName = jobName + "-Map";
      String reduceJobName = jobName + "-Reduce";

      // Get user args.
      String inputDir = args[0];
      String outputDir = args[1];

      // Define input path and output path.
      Path mapInputPath = new Path(inputDir);
      Path mapOutputPath = new Path(outputDir + "-inter");
      Path reduceOutputPath = new Path(outputDir);

      // Define partition file path.
      Path partitionPath = new Path(outputDir + "-part.lst");

      // Configure map-only job for sampling.
      Job mapJob = new Job(getConf());
      mapJob.setJobName(mapJobName);
      mapJob.setJarByClass(WordCount.class);
      mapJob.setMapperClass(WordMapper.class);
      mapJob.setNumReduceTasks(0);
      mapJob.setOutputKeyClass(Text.class);
      mapJob.setOutputValueClass(IntWritable.class);
      TextInputFormat.setInputPaths(mapJob, mapInputPath);

      // Set the output format to a sequence file.
      mapJob.setOutputFormatClass(SequenceFileOutputFormat.class);
      SequenceFileOutputFormat.setOutputPath(mapJob, mapOutputPath);

      // Submit the map-only job.
      int exitCode = mapJob.waitForCompletion(true) ? 0 : 1;
      if (exitCode != 0) { return exitCode; }

      // Set up the second job, the reduce-only.
      Job reduceJob = new Job(getConf());
      reduceJob.setJobName(reduceJobName);
      reduceJob.setJarByClass(WordCount.class);

      // Set the input to the previous job's output.
      reduceJob.setInputFormatClass(SequenceFileInputFormat.class);
      SequenceFileInputFormat.setInputPaths(reduceJob, mapOutputPath);

      // Set the output path to the final output path.
      TextOutputFormat.setOutputPath(reduceJob, reduceOutputPath);

      // Use identity mapper for key/value pairs in SequenceFile.
      reduceJob.setReducerClass(IntSumReducer.class);
      reduceJob.setMapOutputKeyClass(Text.class);
      reduceJob.setMapOutputValueClass(IntWritable.class);
      reduceJob.setOutputKeyClass(Text.class);
      reduceJob.setOutputValueClass(IntWritable.class);
      reduceJob.setNumReduceTasks(REDUCE_TASKS);

      // Use Total Order Partitioner.
      reduceJob.setPartitionerClass(TotalOrderPartitioner.class);

      // Generate partition file from map-only job's output.
      TotalOrderPartitioner.setPartitionFile(
          reduceJob.getConfiguration(), partitionPath);
      InputSampler.writePartitionFile(reduceJob, new InputSampler.RandomSampler(
          1, 10000));

      // Submit the reduce job.
      return reduceJob.waitForCompletion(true) ? 0 : 2;
    }
  }

  public static class WordMapper extends
      Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String line = value.toString();
      for (String word : line.split("\\W+")) {
        if (word.length() == 0) { continue; }
        context.write(new Text(word), new IntWritable(1));
      }
    }
  }

}

我从github获得了这段代码。
我比较了 map 的经过时间并减少了时间。
常规单词计数比总订单分配器执行得更好。
这是为什么?
为了达到平均性能需要进行任何优化或更改吗?
Hashpartitioner性能vs TotalOrderPartitioner性能?

最佳答案

是的,因为HashPartitioner没有开销或运行InputSampler并写入Partition文件等,所以HashPartitioner的性能将优于TotalOrderPartitioner,

TotalOrderPartitioner仅在需要全局排序的输出时使用,并且比HashPartitioner慢。

关于hadoop - Hadoop总订单分区程序,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/34950219/

10-12 06:03