1 Shuffle
step1
(input)
- 读取数据
- 转换成<key,value>
- TextInputFormat
step2
(map)
- map(KEYIN,VALUEIN,KEYOUT,VALUEOUT),
- 默认情况下:KEYIN : LongWritable, VALUEIN:TEXT
step3
(shuffle)
process
map,output<key,value>
* memory
* spill,溢写到磁盘中,可能有很多文件。(1)分区 partition (2) 排序 sort
很多小文件,spill
* 合并,merge
* 排序
大文件
* copy, Reduce Task 会到 Map Task 运行的机器上,拷贝要处理的数据
* 合并,merge,排序
* 分组 group ,将相同 key 的 value 放在一起
reduce
output
- OutputFormat
- FileOutputFormat, TextOutputFormat,
- 每个key-value对,输出一行,key 与 value 中间分隔符为 \t
- 默认调用 Key 和 Value 的 toString() 方法
1.1 总结
- 分区:partitioner
- 排序
- copy, 用户无法干涉
- 分组,
- 可设置
* 压缩
* combiner
2 MapReduce Shuffle 过程的 5 步如何在 Job 中设置
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class ModelMapReduce extends Configured implements Tool {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapreduce.map.output.compress","true");
conf.set("mapreduce.map.output.compress.codec","");
int status = ToolRunner.run(conf,new WordCount(),args);
System.exit(status);
}
public static class ModelMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
}
@Override
public void run(Context context) throws IOException, InterruptedException {
}
}
public static class ModelReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
}
@Override
public void run(Context context) throws IOException, InterruptedException {
}
}
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = getConf();
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(this.getClass());
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
job.setMapperClass(ModelMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//********************* Shuffle*******************************************
// 1. partitioner
// job.setPartitionerClass();
//2. sort
//job.setSortComparatorClass();
//3. combioner[optional]
//job.setCombinerClass();
//4.group
//job.setGroupingComparatorClass();
//************************ Shuffle ***********************************************
job.setReducerClass(ModelReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
}
3 MapReduce 调优
- Reduce Task Number
- Map Task 输出压缩
- Shuffle 参数