Hadoop是 Apache 旗下的一个用 java 语言实现开源软件框架,是一个开发和运行处理大规模数据的软件平台。允许使用简单的编程模型在大量计算机集群上对大型数据集进行分布式处理。
特性:扩容能力,成本低,高效 ,可靠性
首次启动 HDFS 时,必须对其进行格式化操作。本质上是一些清理和准备工作,因为此时的 HDFS 在物理上还是不存在的
常用端口号
- namenode 50070
- datenode 50075
- secondarynamenode 50090
- yarn.resoucemanager 8088
- 历史服务器 19888
目录结构及文件分块位置信息叫做元数据。
HDFS存储规则尽可能不存储小文件,因为一条元数据理论上150kb,若10W个小文件,存储元数据内存就要15M。
HDFS读写数据流程
1)写数据
1. 客户端向namenode发出申请
2. namenode查看是否有权限, 是否有同名文件
3. namenode向客户端返回是否允许上传
4. 对文件进行切割, 切割成大小为128MB的block块, 1.x 默认64MB, 2.x 默认128MB
5. 开始上传第一个block
6. namenode根据配置文件中指定的备份数量及副本放置策略进行文件分配, 返回datenode地址,通过机架感知选出datenode列表,如node01,node03
机架感知--3副本机制: 判断客户端所在位置, 如果客户端在当前HDFS集群上的一台, 那么第一个副本会放在客户端所在的机器上, 第二个副本不同于第一个, 放在不同于第一台的另一个机架, 第三个与第二个在同一个机架上, 但在不同的节点上.
7. 客户端请求2台datenode中的node01上传数据, 本质上是调用RPC, 建立pipeline连接, node01继续调用node03, 将整个pipeline连接完成,然后逐步返回客户端
8. 客户端开始向node01开始上传第一个block块, 先从磁盘读取放到本地内存缓存, 以packet为单位, 默认64KB, node01收到一个packet就会传给node03, node01每传一个packet就将其放入一个应答队列等待应答.
9. 数据被分割成一个个packet数据包在pipeline上一次传输, 在pipeline反方向上, 备份节点存储成功后逐个发送ack正确码, 最终由pipeline中第一个DataNode节点node01将pipeline ack确认码发送给Client.
10. 继续依次上传一个个block块
2)读数据
1. 客户端向namenode发起RPC请求, 确定文件block的所在位置
2. namenode返回元数据, 对于每个block, namenode都会返回副本所在datenode地址
3. 返回的datenode地址会根据集群拓扑结构算出与客户端的距离, 并进行排序.
排序两个规则: 网络拓扑结构中与客户端距离近的排靠前, 心跳机制中汇报超时的datenode状态为stale的排靠后
4. 客户端通过排靠前的datenode读取block, 若客户端就是datenode, 则从本地直接读取
5. 底层本质上是建立 socket stream (FSDateInputFormat) , 重复的调用父类DataInputFormat的read方法, 直到这块读取完毕
6. 读完列表的block, 文件读取还没有结束, 客户端继续向block申请下一批block
7. 读完一个block会进行checksum验证, 如果读取datenode出现错误, 会通知namenode, 找下一个存有这个block备份的datenode
8. read方法的并行读取block块, 不是一块一块读取, namenode只返回datenode的地址, 不返回块的数据
9. 最终读取的所有block合并成一个大文件
MapReduce 框架结构
一个完整的 mapreduce 程序在分布式运行时有三类实例进程:
- MRAppMaster:负责整个程序的过程调度及状态协调
- MapTask:负责 map 阶段的整个数据处理流程
- ReduceTask:负责 reduce 阶段的整个数据处理流程
MR编程(天龙八部)
map阶段:2步
第一步:设置InputFormat(通常使用TextInputFormat)的类型和数据的路径---获取数据的过程(可以得到K1,V1)
第二步:自定义Mapper--将K1,V1转为K2,V2
shuffle阶段:4步
第三步:分区的动作,如果有多个reduce才去考虑分区,默认只有一个reduce,分区可以省略
第四步:排序,默认对K2进行排序(字典序)--管好K2就行
第五步:规约,combiner是一个局部的reduce,map端的合并,是对mapreduce的优化操作,不会影响任何结果,减少网络传输,默认可以省略
第六步:分组,相同的K(K2)对应的V会放到同一个集合中---将map传递的K2,V2变成新的K2,V2
Reduce阶段:2步
第七步:自定义Reducer 得到K2,V2,转为K3,V3
第八步:设置OutputFormat和数据的路径--生成结果文件
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。
1 //反序列化的方法,反序列化时, 2 //从流中读取到的各个字段的顺序应该与序列化时 3 //写出去的顺序保持一致 4 @Override 5 public void readFields(DataInput in) throws IOException { 6 upflow = in.readLong(); 7 dflow = in.readLong(); 8 sumflow = in.readLong(); 9 } 10 //序列化的方法 11 @Override 12 public void write(DataOutput out) throws IOException { 13 out.writeLong(upflow); 14 out.writeLong(dflow); 15 out.writeLong(sumflow); 16 } 17 @Override 18 public int compareTo(FlowBean o) { 19 //实现按照 sumflow 的大小倒序排序 20 return sumflow>o.getSumflow()?-1:1; 21 }
compareTo方法用于将当前对象与方法的参数进行比较。 如果指定的数与参数相等返回 0。 如果指定的数小于参数返回 -1。 如果指定的数大于参数返回 1。
例如:o1.compareTo(o2) 返回正数的话,当前对象(调用 compareTo 方法的对象 o1)要排在比较对象(compareTo 传参对象 o2)后面,返回负数的话,放在前面。
分组
Mapreduce 中会将 map 输出的 kv 对,按照相同 key 分组,然后分发给不同的 reducetask。 默认的分发规则为:根据 key 的 hashcode%reducetask 数来分发
combiner规约
每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络 IO 性能,是 MapReduce 的一种优化手段之一。
- combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
- combiner 组件的父类就是 Reducer
- combiner 和 reducer 的区别在于运行的位置:
- combiner 是在每一个 maptask 所在的节点运行
- reducer 是接收全局所有 Mapper 的输出结果;
- combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
具体实现步骤:
- 自定义一个 combiner 继承 Reducer,重写 reduce 方法
- 在 job 中设置: job.setCombinerClass(CustomCombiner.class)
combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来。
MapTask 工作机制
ReduceTask 工作机制
Shuffle 机制
map 阶段处理的数据如何传递给 reduce 阶段,是 MapReduce 框架中最关键的一个流程,这个流程就叫 shuffle。
- 将map端结果输出到环形缓冲区, 默认为100M, 保存的是<key, value>和分区信息多个ReduceTask时才需要分区
- 当环形缓冲去到达80%时, 写入磁盘中, 在写入之前对数据进行快排, 如果配置了combiner, 还会对有相同分区号和key进行排序
Combiner规约:作用就是对map端的输出先做一次合并, 以减少在map和reduce节点之间的数据传输量, 以提高网络IO性能, 是MR的一种优化手段之一
- 将所有溢出的临时文件进行一次合并操作, 确保一个MapTask最终只生成一个文件
- Reduce复制一份数据, 默认保存在内存的缓冲区中, 到达阈值, 将数据写到磁盘
- Reduce复制数据同时, 开启两个线程对内存到本地的数据进行合并
- 进行合并数据同时, 进行排序, 因为Map端已进行局部排序, Reduce只需保证数据整体有效
压缩机制map输出压缩(减少shuffle过程中网络传输)reduce输出压缩(减少HDFS存储)
压缩
DEFLATE:自带,不可切分,和文本处理一样, 不需要修改
Gzip:自带,不可切分,和文本处理一样, 不需要修改
Bzip2:自带,可以切分,和文本处理一样, 不需要修改
LZO:需要安装,可以切分,建立索引指定输入格式
snappy:需要安装,不可切分,和文本处理一样, 不需要修改
切片机制
- 单纯按照文件内容切分
- 切片大小等于block块大小
- 不考虑数据集整体, 逐个针对文件切片
- 切片公式: max(0, min(Long_max, blockSize))
DistributedCache 分布式缓存
Map 端 Join 解决数据倾斜,我们为每一个 MapTask 准备一个表的全表数据文件。这种机制叫做 Map Side Join。适用于关联表中有小表的情形。
CombineTextInputFormat
小文件处理场景 将 HDFS 上多个小文件合并到一个 InputSplit中,然后会启用一个 Map 来处理这里面的文件,以此减少 MR 整体作业的运行时间。
元数据的管理机制 checkpoint
NameNode 维护整个文件系统元数据
按照类型划分:文件信息、块信息、datanode节点信息
按照形式划分:内存元数据、文件元数据
- 内存元数据:文件信息、块信息、datanode节点信息
- 文件元数据:只包含文件信息,其他信息是在datanode启动的时候进行上报
fsimage镜像文件
通常很大,保存的是一个小时前文件信息,是元数据的一个持久化的检查点,包含 Hadoop 文件系统中的所有目录和文件元数据信息,但不包含文件块位置的信息。文件块位置信息只存储在内存中,是在 datanode 加入集群的时候,namenode 询问 datanode 得到的,并且间断的更新。
edits日志
通常比较小,保存最近更改的文件信息,存放的是 Hadoop 文件系统的所有更改操作(文件创建,删除或修改)的日志,文件系统客户端执行的更改操作首先会被记录到 edits 文件中,只有当所有的写操作都执行完成之后,写操作才会返回成功。
SecondaryNameNode
职责是合并NameNode的edit logs到fsimage文件中, 减小edit logs文件的大小和得到一个最新的fsimage文件
Secondary Namenode将namenode上积累的所有edits和一个最新的fsimage下载到本地, 并加载到内存进行merge, 这个过程称为checkpoint. 一个小时检查一次fsimage,更新edits的数据到fsimage中。
安全模式(其中有个概念叫最小的副本的副本率)
HDFS所处的一种特殊状态,在这种状态下,文件系统只接受读数据请求,而不接受删除、修改等变更请求,是一种保护机制,用于保证集群中的数据块的安全性。 如果 HDFS 处于安全模式下,不允许 HDFS 客户端进行任何修改文件的操作,包括上传文件,删除文件,重命名,创建文件夹,修改副本数等操作。
Yarn
是一个资源管理, 任务调度系统
ResourceManager:所有资源的监控, 分配和管理, 是一个全局的资源管理系统
NodeManager:每一个节点的维护,是每个节点上的资源和任务管理器,它是管理这台机器的代理,负责该节点程序的运行,以及该节点资源的管理和监控
ApplicationMaster:具体每一个应用程序的调度和协调, 分配任务, 用户提交的每个应用程序均包含一个AppMaster
对于所有的applications, RM拥有绝对的控制权和对资源的分配权. 而每个AM则会和RM协商资源, 同时和NodeManager通信来执行和监控task.
Yarn工作流程
- 客户端向RM提交程序, 包括AppMaster的必须信息,例如 ApplicationMaster 程序、启动 ApplicationMaster 的命令、用户程序等。
- RM启动Container用来运行AppMaster
- 启动中的AppMaster向RM注册, 启动完成后与RM保持心跳
- AppMaster向RM申请相应数量的Container
- RM返回AppMaster申请的Container信息, 由AppMaster对其进行初始化
- AppMaster与NM进行通信, 要求NM启动Container, 两者保持心跳, 从而对NM运行的任务进行管理和监控
- AppMaster对运行中的Container进行监控, Container通过RPC协议对相应的NM汇报信息
- 客户端通过AppMaster获取运行状态进度等信息
- 结束后, AppMaster向RM进行注销, 并回收Container
调度器
1)先进先出调度器(企业一定不会):先进先出, 同一个时间队列只有一个任务执行
2)容量调度器(Hadoop2.7.2默认调度器):多队列, 每个队列内部先进先出, 同一个时间队列只有一个任务执行, 队列并行度为队列个数,
- 每个队列可配置资源
- 对同一用户提交的作业所占资源进行限定
- 首先, 计算每个队列中正在进行的任务与所分得资源的比值, 选出一个值最小的队列, 也就是最闲的
- 其次, 按照作业优先级和提交时间顺序, 同时考虑用户资源量限制和内存限制, 对队列中的任务排序
- 队列同时按照提交任务的顺序并行运行
3)公平调度器:多队列, 每个队列内部按照缺额大小分配资源启动任务, 同一时间队列只有一个任务执行, 队列并行度大于队列个数,支持多队列多用户, 每个队列的资源量可以配置, 所以任务共享资源量
- 每个队列的任务按照优先级分配资源, 优先级越高资源越多, 但是每个任务都可以分配到资源能保证公平
- 任务理想计算资源与实际计算资源的差值叫做缺额
- 同一队列中, 缺额越大, 任务优先级越高, 可多个作业同时运行
Hadoop High Availability
HA(High Available), 高可用,是保证业务连续性的有效解决方案,一般有两个或两个以上的节点,分为活动节点(Active)及备用节点(Standby)。
脑裂
在HA环境中, active的NM出现假死状态, stanby接收不到active的心跳, 判断active的NM处于宕机, 但实际上未死亡, stanby转换为active, 若此时原来active的NM复活, 此时有两台active的NM, 称为脑裂
危害
1. 造成资源争夺
2. 造成数据不统一
Hadoop Federation
联邦机制就是会有多个NameNode。多个 NameNode 的情况意味着有多个 namespace(命名空间),区别于 HA 模式下的多 NameNode,它们是拥有着同一个 namespace。
概括:
- 多个 NN 共用一个集群里的存储资源,每个 NN 都可以单独对外提供服务。
- 每个 NN 都会定义一个存储池,有单独的 id,每个 DN 都为所有存储池提供存储。
- DN 会按照存储池 id 向其对应的 NN 汇报块信息,同时,DN 会向所有 NN 汇报本地存储可用资源情况。
缺点
并没有完全解决单点故障问题。 所以一般集群规模真的很大的时候,会采用 HA+Federation 的部署方案。也就是每个联合的 namenodes 都是 ha 的。
Hadoop参数调优
- 在hdfs-site.xml文件中配置多目录,最好提前配置好,否则更改目录需要重新启动集群
- NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。
- dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为10台时,此参数设置为60
- 编辑日志存储路径dfs.namenode.edits.dir设置与镜像文件存储路径dfs.namenode.name.dir尽量分开,达到最低写入延迟
- 服务器节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。 yarn.nodemanager.resource.memory-mb
- 单个任务可申请的最多物理内存量,默认是8192(MB)。yarn.scheduler.maximum-allocation-mb
Hadoop宕机
- 如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)
- 如果写入文件过量造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。高峰期的时候用Kafka进行缓存,高峰期过去数据同步会自动跟上。
Hadoop基准测试