一、InputFormat 数据输入
- 数据块:Block是HDFS上将文件分割成为物理块
- 数据切片:逻辑上对数据进行分片
- 数据切片与MapTask并行度决定机制
1)一个Job的并行度是由谁决定?? 由数据切片数目决定
2)每一个split切片会分到一个MapTask上运行。
3)如果切片大小和数据块大小不相同,一个数据块就会被分割,缺点是会产生数据传输的消耗。
4)根据3),默认的情况下,切片大小和数据块的大小是相同的。
5)切片的时候不会吧数据集作为一个整体进行切片,而是会吧单个文件作为整体进行划分切片。这样,最后一个文件块会被当作一个单独的切片,单独分配到一个MapTask中。理由如3)
二、job源码分析
drive 类
public class AccessLocalApp {
public static void main(String[] args) throws Exception {
// 获取配置信息
Configuration configuration = new Configuration();
// 获取job对象实例
Job job = Job.getInstance(configuration);
// 指定本程序的jar包所在的本地路径
job.setJarByClass(AccessLocalApp.class);
// 指定本程序自定义的Mapper和Reducer
job.setMapperClass(AccessMapper.class);
job.setReducerClass(AccessReducer.class);
// 自定义分区
job.setPartitionerClass(AccessPartitioner.class);
// 设置分区数
job.setNumReduceTasks(3);
// Mapper输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Access.class);
// Reducer输出的数据类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Access.class);
输入输出的路径
FileInputFormat.setInputPaths(job, new Path("/Users/mac126/code/spring/hadoopapi/access/input/access.log"));
FileOutputFormat.setOutputPath(job, new Path("/Users/mac126/code/spring/hadoopapi/access/output/"));
// 以上都是Job的一些配置信息。
// 从下面还是正式Job的提交和运行
job.waitForCompletion(true);
}
}
Job的提交和运行
job.waitForCompletion();
submit();
// 1. 建立连接
connect();
new Cluster(getConfiguration());
initialize(jobTrackAddr, conf);
// 2. 创建提交对象,提交job
submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
submitter.submitJobInternal(Job.this, cluster);
// 2.1 检查job的输出空间
checkSpecs(job);
// 2.2 创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2.3 获取jobid,和Stag路径一起构成job路径
JobID jobId = submitClient.getNewJobID();
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
// 2.4 拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 2.5 计算切片,生成切片规划文件
int maps = writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
List<InputSplit> splits = input.getSplits(job);
// 2.6 向stag路径写XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 2.7 提交job 返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
切片源码分析:
1) 找到数据存储目录;
2)开始遍历处理目录下的每一个文件
3)遍历第一个文件
3.1) 获取文件的大小fs.sizeOf()
3.2)计算切片的大小