在Hadoop中实现全排序有如下三种方法:
1. 只使用一个reducer
2. 自定义partitioner
3. 使用TotalOrderPartitioner
其中第一种方法显然违背了mapreduce分布式编程的初衷,在数据量大的情况下并不适用。第二种方法的问题在于开发人员需要预先知道输入数据集的取值分布,不然无法保证每一个reducer的负载均衡。这里我们简单介绍下第三种方法。
package SortTest; import java.io.IOException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; public class TotalSort extends Configured implements Tool { public static class MapperTest extends Mapper<LongWritable, Text, LongWritable, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
LongWritable first = new LongWritable(Integer.parseInt(split[0]));
Text second = new Text(split[1]);
context.write(first, second); }
} public static class ReducerTest extends Reducer<LongWritable, Text, LongWritable, Text> {
public void reduce(LongWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
} public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new TotalSort(), args);
System.exit(res);
} static final String INPUT = "/home/sort_in";
static final String OUTPUT = "/home/sort_out"; @Override
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://hadoop001:9001");
Job job = Job.getInstance(conf, "TotalSort"); FileInputFormat.addInputPath(job, new Path(INPUT));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT)); job.setNumReduceTasks(3);
job.setJarByClass(TotalSort.class);
job.setMapperClass(MapperTest.class);
job.setReducerClass(ReducerTest.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class); InputSampler.RandomSampler<LongWritable, Text> sampler =
new InputSampler.RandomSampler<LongWritable, Text>(1,10,2); // 告诉hadoop分布式缓存文件放在哪里好
Path cachePath = new Path("/home/partition/pfile");
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), cachePath);
InputSampler.writePartitionFile(job, sampler);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
} }