MapReduce详细工作流程之Map阶段
- 首先有一个200M的待处理文件
- 切片:在客户端提交之前,根据参数配置,进行任务规划,将文件按128M每块进行切片
- 提交:提交可以提交到本地工作环境或者Yarn工作环境,本地只需要提交切片信息和xml配置文件,Yarn环境还需要提交jar包;本地环境一般只作为测试用
- 提交时会将每个任务封装为一个job交给Yarn来处理(详细见后边的Yarn工作流程介绍),计算出MapTask数量(等于切片数量),每个MapTask并行执行
- MapTask中执行Mapper的map方法,此方法需要k和v作为输入参数,所以会首先获取kv值;
- 首先调用InputFormat方法,默认为TextInputFormat方法,在此方法调用createRecoderReader方法,将每个块文件封装为k,v键值对,传递给map方法
- map方法首先进行一系列的逻辑操作,执行完成后最后进行写操作
- map方法如果直接写给reduce的话,相当于直接操作磁盘,太多的IO操作,使得效率太低,所以在map和reduce中间还有一个shuffle操作
- map处理完成相关的逻辑操作之后,首先通过outputCollector向环形缓冲区写入数据,环形缓冲区主要两部分,一部分写入文件的元数据信息,另一部分写入文件的真实内容
- 环形缓冲区的默认大小是100M,当缓冲的容量达到默认大小的80%时,进行反向溢写
- 在溢写之前会将缓冲区的数据按照指定的分区规则进行分区和排序,之所以反向溢写是因为这样就可以边接收数据边往磁盘溢写数据
- 在分区和排序之后,溢写到磁盘,可能发生多次溢写,溢写到多个文件
- 对所有溢写到磁盘的文件进行归并排序
- 在9到10步之间还可以有一个Combine合并操作,意义是对每个MapTask的输出进行局部汇总,以减少网络传输量
- Map阶段的进程数比Reduce阶段要多,所以放在Map阶段处理效率更高
- Map阶段合并之后,传递给Reduce的数据就会少很多
- 但是Combiner能够应用的前提是不能影响最终的业务逻辑,而且Combiner的输出kv要和Reduce的输入kv类型对应起来
MapReduce详细工作流程之Reduce阶段
- 所有的MapTask任务完成后,启动相应数量的ReduceTask(和分区数量相同),并告知ReduceTask处理数据的范围
- ReduceTask会将MapTask处理完的数据拷贝一份到磁盘中,并合并文件和归并排序
- 最后将数据传给reduce进行处理,一次读取一组数据
- 最后通过OutputFormat输出
Shuffle机制
- MapTask收集map()方法输出的kv对,放到环形缓冲区中
- 从环形缓冲区不断溢出到本地磁盘文件,可能会溢出多个文件
- 多个溢出文件会被合并成大的溢出文件
- 在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
- ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
- ReduceTask将取到的来自同一个分区不同MapTask的结果文件进行归并排序
- 合并成大文件后,shuffle过程也就结束了,进入reduce方法
Yarn工作机制
- MR程序提交到客户端所在的节点,YarnRunner向ResourceManager申请一个Application
- RM将该Application的资源路径和作业id返回给YarnRunner
- YarnRunner将运行job所需资源提交到HDFS上
- 程序资源提交完毕后,申请运行mrAppMaster
- RM将用户的请求初始化成一个Task
- 其中一个NodeManager领取到Task任务
- 该NodeManager创建容器Container,并产生MRAppmaster
- Container从HDFS上拷贝资源到本地
- MRAppmaster向RM 申请运行MapTask资源
- RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器
- MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序
- MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask
- ReduceTask向MapTask获取相应分区的数据
- 程序运行完毕后,MR会向RM申请注销自己
10-09 05:12