import java.io.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.mapreduce.lib.partition.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.util.*;
/**
* Demonstrates how to use Total Order Partitioner on Word Count.
*/
public class TotalOrderPartitionerExample {
public static class WordCount extends Configured implements Tool {
private final static int REDUCE_TASKS = 8;
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new WordCount(), args);
System.exit(exitCode);
}
@Override @SuppressWarnings({ "unchecked", "rawtypes" })
public int run(String[] args) throws Exception {
// Check arguments.
if (args.length != 2) {
String usage =
"Usage: " +
"hadoop jar TotalOrderPartitionerExample$WordCount " +
"<input dir> <output dir>\n"
System.out.printf(usage);
System.exit(-1);
}
String jobName = "WordCount";
String mapJobName = jobName + "-Map";
String reduceJobName = jobName + "-Reduce";
// Get user args.
String inputDir = args[0];
String outputDir = args[1];
// Define input path and output path.
Path mapInputPath = new Path(inputDir);
Path mapOutputPath = new Path(outputDir + "-inter");
Path reduceOutputPath = new Path(outputDir);
// Define partition file path.
Path partitionPath = new Path(outputDir + "-part.lst");
// Configure map-only job for sampling.
Job mapJob = new Job(getConf());
mapJob.setJobName(mapJobName);
mapJob.setJarByClass(WordCount.class);
mapJob.setMapperClass(WordMapper.class);
mapJob.setNumReduceTasks(0);
mapJob.setOutputKeyClass(Text.class);
mapJob.setOutputValueClass(IntWritable.class);
TextInputFormat.setInputPaths(mapJob, mapInputPath);
// Set the output format to a sequence file.
mapJob.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(mapJob, mapOutputPath);
// Submit the map-only job.
int exitCode = mapJob.waitForCompletion(true) ? 0 : 1;
if (exitCode != 0) { return exitCode; }
// Set up the second job, the reduce-only.
Job reduceJob = new Job(getConf());
reduceJob.setJobName(reduceJobName);
reduceJob.setJarByClass(WordCount.class);
// Set the input to the previous job's output.
reduceJob.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.setInputPaths(reduceJob, mapOutputPath);
// Set the output path to the final output path.
TextOutputFormat.setOutputPath(reduceJob, reduceOutputPath);
// Use identity mapper for key/value pairs in SequenceFile.
reduceJob.setReducerClass(IntSumReducer.class);
reduceJob.setMapOutputKeyClass(Text.class);
reduceJob.setMapOutputValueClass(IntWritable.class);
reduceJob.setOutputKeyClass(Text.class);
reduceJob.setOutputValueClass(IntWritable.class);
reduceJob.setNumReduceTasks(REDUCE_TASKS);
// Use Total Order Partitioner.
reduceJob.setPartitionerClass(TotalOrderPartitioner.class);
// Generate partition file from map-only job's output.
TotalOrderPartitioner.setPartitionFile(
reduceJob.getConfiguration(), partitionPath);
InputSampler.writePartitionFile(reduceJob, new InputSampler.RandomSampler(
1, 10000));
// Submit the reduce job.
return reduceJob.waitForCompletion(true) ? 0 : 2;
}
}
public static class WordMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
for (String word : line.split("\\W+")) {
if (word.length() == 0) { continue; }
context.write(new Text(word), new IntWritable(1));
}
}
}
}
我从github获得了这段代码。
我比较了 map 的经过时间并减少了时间。
常规单词计数比总订单分配器执行得更好。
这是为什么?
为了达到平均性能需要进行任何优化或更改吗?
Hashpartitioner性能vs TotalOrderPartitioner性能?
最佳答案
是的,因为HashPartitioner没有开销或运行InputSampler并写入Partition文件等,所以HashPartitioner的性能将优于TotalOrderPartitioner,
TotalOrderPartitioner仅在需要全局排序的输出时使用,并且比HashPartitioner慢。
关于hadoop - Hadoop总订单分区程序,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/34950219/