: //:什么是hadoop?
hadoop是解决大数据问题的一整套技术方案 :hadoop的组成?
核心框架
分布式文件系统
分布式计算框架
分布式资源分配框架
hadoop对象存储
机器计算 :hadoop 云计算 大数据 微服务 人工智能关系
参见word学习文档
. 现阶段,云计算的两大底层支撑技术为“虚拟化”和“大数据技术” . 而HADOOP则是云计算的PaaS层的解决方案之一,并不等同于PaaS,更不等同于云计算本身。 :大数据项目的通常结构
采集数据
数据分析统计
数据展示 :大数据项目的通常技术架构
见画图 Hadoop Common:
为其他Hadoop模块提供基础设施。
Hadoop DFS:
一个高可靠、高吞吐量的分布式文件系统
Hadoop MapReduce:
一个分布式的离线并行计算框架
Hadoop YARN:
一个新的MapReduce框架,任务调度与资源管理 :安装一个伪分布式的hdfs
a:准备安装介质
hadoop-2.8..tar.gz
b:把安装介质上传到linux
c:在linux使用hostname命令确认主机名
d:编辑/etc/hosts文件 完成ip地址和主机名的映射
在分布式的每一台机器中都需要把所有机器的ip地址和主机名的映射关系配置 注意:关闭每台机器的防火墙
systemctl stop firewalld.service 关闭
systemctl disable firewalld.service 禁止开机启动
e:配置ssh免密码登录
f:从/export/software/下面把hadoop-2.8..tar.gz 解压到/export/servers/下
常用目录说明:
bin sbin hadoop常用的命令目录 配置到/etc/profile中
etc hadoop常用配置文件目录
share hadoop核心jar包目录
g:完成配置文件
hadoop-env.sh 配置java环境
core-site.xml hadoop核心配置
hdfs-site.xml hdfs核心配置
mapred-site.xml mr的核心配置
yarn-site.xml yarn的核心配置
h:格式化namenode环境
创建namenode保存数据的环境
hdfs namenode -format
i:使用命令启动hdfs
start-dfs.sh
start-yarm.sh
执行完成回到命令行
可以使用jps查看关键进程是否已经在运行
namenode
SecondaryNameNode
datanode 还要从web页面使用http连接管理页面查看
http://192.168.21.134:50070 可以看到hdfs的管理页面 证明hdfs安装并启动成功
http://192.168.21.134:8088 可以看到mr计算任务的管理页面 证明mr yarn安装并启动成功 注意事项:
a:如果某个进程没有正确启动 要学会看日志
eg:
starting namenode, logging to /export/service/hadoop-2.8./logs/hadoop-root-namenode-text4.out
以上输出信息是说明namenode启动过程写入了哪一个日志
假设最终namenode没有启动成功 需要打开日志查看原因 b:常用命令中还有单独启动某个进程的命令
hdfs单独启动各个进程服务
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode
hadoop-daemon.sh start secondarynamenode
yarn单独启动各个进程服务
yarn-daemon.sh start resourcemanager
yarn-daemon.sh start nodemanager :安装分布式
a:已有一台虚拟机基础环境配置完毕 text4
b:利用有的虚拟机复制出2台 一共3台虚拟机
复制以已及每台配置网络 参照文档即可
b1:配置每台机器的主机名(提前规划好)(text4 text5 text6)
text4 —— namenode datanode secondarynamenode resourcemanager nodemanager
text5 —— datanode nodemanager
text6 —— datanode nodemanager
b2:修改网卡配置(每台机器的ip地址是提前规划好)(192.168.21.134 192.168.21.138 192.168.21.139)
b3:修改/etc/hosts文件 配置每台机器和ip地址的映射关系
c:配置3台之间可以ssh免密登录
ssh-keygen
ssh-copy-id root@机器名称
c1:配置text4 text5 text6互相都可以ssh免密登录
c2:选取一台虚拟机做namenode 配置它到text5 text6免密登录 可以在多个机器之间使用scp传输文件 如果不需要输入密码 则ssh免密配置正确
scp -r(目录整体复制) 目录/文件名称 用户名@机器名:目的机器的路径
d:确保所有机器防火墙都是关闭
e:修改每台机器的hadoop配置文件
e1:把每个block(数据块)修改为有2个备份
e2:把namenode相关的ip地址都修改成了主机名
f:把已有的namenode datanode配置好的数据文件夹删除
g:在namenode节点 执行namenode格式化 hdfs namenode -format
h:在namenode执行start-dfs.sh 启动hdfs
使用jps查看每台机器的进程规划 :namenode datanode 数据目录结构讲解
namenode:
存放数据目录位置 /data/hadoop/dfs/name datanode:
存放数据目录位置 /data/hadoop/dfs/data
实际上传到datanote中的文件数据都是保存在finalized目录下
eg:从linux 往hdfs上传一个文件 a0001.data
因为配置的block副本是2分 所以3台datanode节点中只有text4 text5有数据
经验证和原有的上传文件内容一致 就以2个副本的方式保证了数据的完整性 eg:从linux往hdfs上传一个大于128m的文件hadoop-2.8..tar.gz(240m)
因为它大于128m(128m是hdfs中默认的一个block的大小) 所以这个压缩包被分成2个block上传
每个block还是2个备份文件 hdfs dfs -put 待上传的文件名称 要上传的hdfs的目的地路径 secondarynamenode:
存放数据目录位置 /data/hadoop/tmp/dfs/namesecondary start-all.sh
启动所有的Hadoop守护进程。包括NameNode、 Secondary NameNode、DataNode、JobTracker、 TaskTrack
stop-all.sh
停止所有的Hadoop守护进程。包括NameNode、 Secondary NameNode、DataNode、JobTracker、 TaskTrack
start-dfs.sh
启动Hadoop HDFS守护进程NameNode、SecondaryNameNode和DataNode
stop-dfs.sh
停止Hadoop HDFS守护进程NameNode、SecondaryNameNode和DataNode
hadoop-daemons.sh start namenode
单独启动NameNode守护进程
hadoop-daemons.sh stop namenode
单独停止NameNode守护进程
hadoop-daemons.sh start datanode
单独启动DataNode守护进程
hadoop-daemons.sh stop datanode
单独停止DataNode守护进程
hadoop-daemons.sh start secondarynamenode
单独启动SecondaryNameNode守护进程
hadoop-daemons.sh stop secondarynamenode
单独停止SecondaryNameNode守护进程
start-mapred.sh
启动Hadoop MapReduce守护进程JobTracker和TaskTracker
stop-mapred.sh
停止Hadoop MapReduce守护进程JobTracker和TaskTracker
hadoop-daemons.sh start jobtracker
单独启动JobTracker守护进程
hadoop-daemons.sh stop jobtracker
单独停止JobTracker守护进程
hadoop-daemons.sh start tasktracker
单独启动TaskTracker守护进程
hadoop-daemons.sh stop tasktracker
单独启动TaskTracker守护进程 sc文件:hdfs dfs -put aa.data /logs io nio aio思想 b:编写hdfs java客户端程序
b1:建立开发工程
建立maven工程
在pom文件中引入开发hdfs客户端需要的jar包
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.</version>
</dependency>
建立普通的java工程
引入jar包 目前只是开发hdfs的java客户端 只引入hadoop安装包下share目录中common和hdfs jar包即可 b2:规划包结构
b3:编写demo测试类
创建一个配置信息封装对象(客户端处理hdfs 编写mr 处理hive 处理hase都是需要配置信息)
创建一个文件系统的客户端对象 方便处理文件数据
对文件的增删改查的java api接口使用
对某个指定目录下所有文件或者目录基本信息的查看 hdfs复习点:
:hdfs就是一个文件系统 在操作系统之上。
通常操作系统针对hdfs叫做本地 :hdfs的重要组成部分
namenode:存储元数据
元数据中数据都有什么?
a:某个文件的某个block存储在哪个datanode上
b:每个datanode资源使用情况
c:每个文件的属性 修改时间等等信息
datanode:
真正存储文件数据
所以一个hdfs没有namenode或者namenode出问题则不能再使用
但是如果是datanode出问题 只可能是数据部分丢失 而不是hdfs不能使用
secondary namenode namenode datanode 关系
https://www.cnblogs.com/chenyaling/p/5521464.html
:基本的理论知识
: //: //14https://blog.csdn.net/lvtula/article/details/82354989
a:hdfs中namenode datanode secondarynamenode 某些主要目录或者文件的作用
https://blog.csdn.net/baiye_xing/article/details/76268495
namenode:edits fsimage version
datanode:finalized version blc文件每个都有一个meta文件
secondarynamenode:edits fsimage
b:namenode中edits和fsimage的作用以及其和内存的相互关系
c:namenode和secondarynamenode的工作机制
d:hdfs写流程
hdfs读流程 :hdfs shell :java api操作hdfs HA hadoop集群搭建步骤:
:对集群每一台机器需要安装什么服务进行规划。
text4:zk nn dn zkfc jn nm rm
text5: zk nn dn zkfc jn nm rm
text6: zk dn jn nm :准备每台机器的基本环境
a:3台机器之间需要免密登录
b:每台机器必须安装好jdk .0以上
c:集群每台机器之间时间同步
c1:设定每台机器的正确时区
timedatectl set-timezone Asia/Shanghai
timedatectl set-local-rtc
date
c2:选择集群中一台机器为主 master 让其它机器和这台机器完成时间同步
使用rdate完成集群之间时间同步
具体操作请参见保存的时间同步页面说明 :修改hadoop集群的配置文件
按照样例配置文件修改即可
:启动集群过程(安装过程的启动)
a:启动zk
b: 启动jn
hadoop-daemon.sh start journalnode
c:格式化nn (选取text4)
d:格式化zkfc
hdfs zkfc -formatZK
e:把text4的nn结构复制到text5
先在text4启动nn
在text5执行 hdfs namenode bootstrapstandby
如果出现此时在text5并没有能成功复制text4的nn的目录机构
则直接可以从text4复制到text5 完成2台nn之间的复制
f:启动集群
zk启动
zkfc启动
start-dfs.sh
start-yarn.sh(如果那个rm没有启动 直接可以使用守护进程) :什么是hadoop?
hadoop是解决大数据问题的一整套技术方案 :hadoop的组成?
核心框架
分布式文件系统
分布式计算框架
分布式资源分配框架
hadoop对象存储
机器计算 :hadoop 云计算 大数据 微服务 人工智能关系
参见word学习文档 :大数据项目的通常结构
采集数据
数据分析统计
数据展示 hadoop-env.sh 配置java环境
core-site.xml hadoop核心配置
hdfs-site.xml hdfs核心配置
mapred-site.xml mr的核心配置
yarn-site.xml yarn的核心配置 namenode:
存放数据目录位置 /data/hadoop/dfs/name namenode的数据文件:
namenode保存的是整个hdfs的元数据
eg:
上传文件的所属 大小 修改日期
以及文件每一个block所在哪个datanode对应关系都是namenode元数据保存的 edtis:保存的是最近的日志记录(namenode接收到的命令以及解析)
fsimage:保存的是namenode内存信息的镜像
seen_txid:集群状态的恢复标识 步骤解析1:
上传
、根namenode通信请求上传文件,namenode检查目标文件是否已存在,父目录是否存在
、namenode返回是否可以上传
、client请求第一个 block该传输到哪些datanode服务器上
、namenode返回3个datanode服务器ABC
、client请求3台dn中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,然后B调用C,将真个pipeline建立完成,逐级返回客户端
、client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,A收到一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答
、当一个block传输完成之后,client再次请求namenode上传第二个block的服务器。 详细步骤解析2:
下载
、跟namenode通信查询元数据,找到文件块所在的datanode服务器
、挑选一台datanode(就近原则,然后随机)服务器,请求建立socket流
、datanode开始发送数据(从磁盘里面读取数据放入流,以packet为单位来做校验)
、客户端以packet为单位接收,现在本地缓存,然后写入目标文件 HA hadoop集群搭建步骤:
:对集群每一台机器需要安装什么服务进行规划。
text4:zk nn dn zkfc jn nm rm
text5: zk nn dn zkfc jn nm rm
text6: zk dn jn nm :准备每台机器的基本环境
a:3台机器之间需要免密登录
b:每台机器必须安装好jdk .0以上
c:集群每台机器之间时间同步
c1:设定每台机器的正确时区
timedatectl set-timezone Asia/Shanghai
timedatectl set-local-rtc
date
c2:选择集群中一台机器为主 master 让其它机器和这台机器完成时间同步
使用rdate完成集群之间时间同步
具体操作请参见保存的时间同步页面说明 :修改hadoop集群的配置文件
按照样例配置文件修改即可
:启动集群过程(安装过程的启动)
a:启动zk
b: 启动jn
hadoop-daemon.sh start journalnode
c:格式化nn (选取text4)
d:格式化zkfc
hdfs zkfc -formatZK
e:把text4的nn结构复制到text5
先在text4启动nn
在text5执行 hdfs namenode bootstrapstandby
如果出现此时在text5并没有能成功复制text4的nn的目录机构
则直接可以从text4复制到text5 完成2台nn之间的复制
f:启动集群
zk启动
zkfc启动
start-dfs.sh
start-yarn.sh(如果那个rm没有启动 直接可以使用守护进程) Zookeeper是一个分布式协调服务;就是为用户的分布式应用程序提供协调服务
A、zookeeper是为别的分布式程序服务的
B、Zookeeper本身就是一个分布式程序(只要有半数以上节点存活,zk就能正常服务)
C、Zookeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统一名称服务……
D、虽然说可以提供各种服务,但是zookeeper在底层其实只提供了两个功能:
管理(存储,读取)用户程序提交的数据;
并为用户程序提供数据节点监听服务; 、Znode有两种类型:
短暂(ephemeral)(断开连接自己删除)
持久(persistent)(断开连接不删除)
、Znode有四种形式的目录节点(默认是persistent )
PERSISTENT
PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )
EPHEMERAL
EPHEMERAL_SEQUENTIAL
、创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护
、在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序 、Zookeeper:一个leader,多个follower组成的集群
、全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的
、分布式读写,更新请求转发,由leader实施
、更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
、数据更新原子性,一次数据更新要么成功,要么失败
、实时性,在一定时间范围内,client能读到最新数据 4.2. zookeeper数据结构
、层次化的目录结构,命名符合常规文件系统规范(见下图)
、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
、节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点,下一页详细讲解)
、客户端应用可以在节点上设置监视器(后续详细讲解) yarn的流程::客户端向yarn发送请求,开始运行job
:resourcemanager反馈信息,把切片资源在hdfs的存放目录发送给客户端
:客户端调用fileinputformat的getsplits方法完成数据切片计算
:客户端把切片资源上传到yarn指定的hdfs目录下
:通知yarn资源已上传,请分配mrappmaster开始执行任务
:yarn吧客户端待执行的任务放入任务队列
:resourcemanager从某个namemodemanager选择一台机器准备运行环境(根据充足的空间判定)
:制定后 mappermaster从指定位置获取任务资源(job.split jar job.xml)
:mappermaster从job.spilit中确认需要运行多少maptask,想yarn的rm申请运行资源
10resourcemanager给mrappmaster反馈可用的运行maptask的资源
11mrappmaster根据yarn提供的资源开始运行maptask
:当maptask运行完毕,mrappmaster会知道
:重复申请资源的步骤 开始运行reducetask
14当reducetask运行完毕 回收mappmaster的占用资源 yarn只是一个分布式资源分配系统,核心是resourcemanager和nodemanager
yarn只负责资源分配,不负责xuanfayunsuan sheff运行流程
map()输出结果->内存(环形缓冲区,当内存大小达到指定数值,如80%,开始溢写到本地磁盘)
溢写之前,进行了分区partition操作,分区的目的在于数据的reduce指向,分区后进行二次排序,第一次是对partitions进行排序,
第二次对各个partition中的数据进行排序,之后如果设置了combine,就会执行类似reduce的合并操作,还可以再进行压缩,
因为reduce在拷贝文件时消耗的资源与文件大小成正比
内存在达到一定比例时,开始溢写到磁盘上
当文件数据达到一定大小时,本地磁盘上会有很多溢写文件,需要再进行合并merge成一个文件
reduce拷贝copy这些文件,然后进行归并排序(再次merge),合并为一个文件作为reduce的输入数据 wordcount流程:
计算在某个目录下有n多份文件,每个文件中有n多个单词,计算每个单词出现的个数
:yarn的resourcemanager会反馈给客户端吧切片信息存放到那个目录下,然后客户端通过调用fileinputformat的getsplits进行切片机算,得到3样数据(
a:切片的描述信息jobsplit b:计算所有单词个数的jar包 c:把这次计算任务job的配置信息放入文件中的job。xml

:放置后会通知yarn分配mappermaster任务,会挑选一台nodemanager机器作为mrappmaster
:mrappmaster会根据job的描述信息 根据job。split确定需要几个 maptaskxiang yarn申请计算资源,
:yarn反馈给mrappmaster明确的可用的计算资源 :mrappmaster根据yarn提供的资源开始运行maptask
:maptask从hdfs中依据job。spilt的描述的数据从hdfs 复制到本地的server,调用wordcount程序完成计算
(计算:maptask运行map方法,map方法接受的是文本中的每一行的偏移量, v是每次map方法执行时读取的一行单词)
:map阶段执行完后在执行reduce阶段, reduce完成对map阶段输出的解过进行合并操作
reducetask的数量是程序员根据需求制定,如果不指定是1
:reduce完成后会输出问结果文件 1yarn的resourcemanager会反馈给客户端切片信息放置的目录,然后客户端通过调用fileinputformat的getsplits进行切片机算,得到三杨树局,
(a切片的描述信息b机损所有单词的jar包c将job的藐视信息放入job。xml)
2客户端放置后通知yarn分配mrappmaster执行任务,yarn会挑选一台namenodemanager作为mrappmaster
3mrappmaster会从指定位置下获取资源,根据job。splits计算需要多少个maptask,并向yarn申请及资源
4yarn反馈资源,mrappmaster给句yarn提供的资源开始运行maptask
5maptask从hdfs中依据job。split的描述的数据从hdfs复制到本地的server,调用wordcount完成计算 .hadoop运行原理
MapReduce
HDFS 分布式文件系统(HDFS客户端的读写流程)
写:
客户端接收用户数据,并缓存到本地
当缓存足够一个HDFS大小的时候
客户端同NameNode通讯注册一个新的块
注册成功后 NameNode给客户端返回一个DateNode的列表
客户端向列表中的第一个DateNode写入块
收到所有的DateNode确认信息后,客户端删除本地缓存
客户端继续发送下一个块
重复以上步骤 所有数据发送完成后,写操作完成
读:
客户端与NameNode通讯获取文件的块位置信息,包括块的所有冗余备份的位置信息:DateNode列表
客户端获取文件位置信息后直接同有文件块的DateNode通讯,读取文件
如果第一个DateNode无法连接,客户端将自动联系下一个DateNode
如果块数据的校验值出错,则客户端需要向NameNode报告,并自动联系下一个DateNode 客户端的hadoop环境:与集群的hadoop包一样
集群入口:core-site.xml、fs.default.name
缓存块大小:fs.block.size
存多少份:fs.replication .mapreduce的原理 mapreduce的原理:一个MapReduce框架由一个单独的master JobTracker和集群节点每一个slave TaskTracker共同组成。
master负责调度构成一个作业的所有任务,在这些slave上,master监控它们的执行,并且重新执行已经失败的任务。
而slave仅负责执行由maste指派的任务。 .Mapreduce数据倾斜是什么意思?怎么处理?
Mapreduce数据倾斜是指我们在分片的时候导不同分片上的数据不均,导致这些分片在并行处理的时候,有的分片执行事件过长,
有的执行时间过短,导致总的执行时间过长的一种现象,通常是由:.map端的key值过多或者有空值;
.业务本身的特性;.某些sql就有数据倾斜;.建表的时候考虑不周等原因造成的。
处理:* a:增加reducetask的数量
* b:在不改变整体统计结果的前提下,可以修改key的设定方式
* c:在做关联时,尽量避免reducetask端的join 可以使用maptask端的join .combiner的作用,使用时机?
Combiner其实也是一种reduce操作,是map运算的后续操作,在map后续对于相同key值做一个简单合并,减小后续的reduce的计算压力。 数据量小的时候,且输入的结果不会影响到reduce输入的结果,且不做平均值的时候,用基于map端之后shuffle端之前的reduce操作。 .MapReduce–如何设置Reducer的个数
.在代码中通过:JobConf.setNumReduceTasks(Int numOfReduceTasks)方法设置reducer的个数;
.在hive中:set mapred.reduce.tasks;
.reducer的最优个数与集群中可用的reducer的任务槽数相关,一般设置比总槽数微少一些的reducer数量;Hadoop文档中推荐了两个公式:
0.95*NUMBER_OF_NODES*mapred.tasktracker.reduce.tasks.maximum
1.75*NUMBER_OF_NODES*mapred.tasktracker.reduce.tasks.maximum
备注:NUMBER_OF_NODES是集群中的计算节点个数;
mapred.tasktracker.reduce.tasks.maximum:每个节点所分配的reducer任务槽的个数; .MR的过程:
input —–>spilt—–>map—–>combiner—–>shuffer—> partition—–>reduce—–>output
spilt :对数据进行split分片处理,产生K1值和V1值,传给map
map: 数据整理,把数据整理成K2和V2,
combiner:如果map输出内容比较多,reduce计算比较慢,我们可以加个combiner
map端的本地化reduce,减少map输出;
shuffer:相同的数据放到一个分区
partiton:如果reduce不是一个,shuffler做一个分区,将相同的K值,分到一个区;
排序方式:hash方式;
reduce:shuffer分区结束后交给reduce进行计算,形成K3 V
output: 将reduce处理完的 K3和V3交给output输出;
a. 客户端编写好mapreduce程序,提交job到jobtracker;
b. Jobtracker进行检查操作,确定输出目录是否存在,存在抛出错误;
c. Jobtracker根据输入计算输入分片input split、配置job的资源、初始化作业、分配任务;
d. 每个input split创建一个map任务,tasktracker执行编写好的map函数;
e. Combiner阶段是可选的,它是一个本地化的reduce操作,合并重复key的值;
f. Shuffle一开始就是map做输出操作,并对结果进行排序,内存使用达到阀值就会spill,把溢出文件写磁盘,写磁盘前有个排序操作,map输出全部做完后,
会合并溢出文件,这个过程中还有个Partitioner操作,一个partitioner对应一个reduce作业,reduce开启复制线程,复制对应的map输出文件,复制时候reduce还会进行排序操作和合并文件操作
g. 传输完成,执行编写好的reduce函数,结果保存到hdfs上。 .MR怎么处理小文件:
.输入过程合并处理:.在linux 10000个文件上传到HDFS时候,用脚本形成二进制文件流上传,上传的过程中就合并成了一个文件。
.如果在hdfs中有大量小文件,首先进行清洗,把10000个小文件清洗成一个文件或者几个文件,写个map(.前提10000小文件格式相同,
.不会有太多的小文件 一千万个小文件,首先会在操作系统上传时就处理完了,但是要是问可以说,分批做,每一万个存储到一个目录中,对一个目录进行map清洗)),其次,进行reduce计算
清洗会产生数据倾斜: 很多小文件是数据倾斜(解决方法):2.1.基于map端的离散方法;2.2.combiner;
//hdfs为什么怕很多小文件:因为很多小文件的话也会占用namenode目录树的空间,一般一个文件的元数据会占到150字节;
而NameNode是要接收集群中所有的DataNode的心跳信息,来确定元数据的信息变化的,当小文件一旦过多,namenode的元数据读取就会变慢。
(在HDFS中,namenode将文件系统中的元数据存储在内存中,因此,HDFS所能存储的文件数量会受到namenode内存的限制) .如何从编程的角度讲解MR的过程
对数据进行底层默认分片把数据解析成k1/v1形式传给map;
Map对k1/v1进行截取、运算等操作生成k2/v2传给reduce;
Reduce对相同key的值进行计算,生成最终结果k3/v3输出 .MR中有没有只有Map的
有,只对数据进行分片,解析成key/value形式后,直接输出结果不进行reduce端的去重和数组化的。
eg:比如说我把所有的经过split(map)形成的元素都放到context的key做标签就不会用到reduce。 .Map输出端的组成部份
Combiner、shuffle、partitioner .如何用MR实现join
) reduce side join(在reduce端做join操作)
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签 (tag),
比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。
在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。
) map side join(在map端做join操作)
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,
让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,
如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
()用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是 HDFS上的文件,可以这样:
hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口 号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
()用户使用 DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。 DistributedCache方法:(DistributedCache 是一个提供给Map/Reduce框架的工具,用来缓存文件(text, archives, jars and so on)文件的默认访问协议为(hdfs://).
DistributedCache将拷贝缓存的文件到Slave节点在任何Job在节点上执行之前。
文件在每个Job中只会被拷贝一次,缓存的归档文件会被在Slave节点中解压缩。)
符号链接
每个存储在HDFS中的文件被放到缓存中后都可以通过一个符号链接使用。
URI hdfs://namenode/test/input/file1#myfile 你可以在程序中直接使用myfile来访问 file1这个文件。 myfile是一个符号链接文件。 .MAP如何排序
在map端一共经历两次的排序:
当map函数产生输出时,会首先写入内存的环形缓冲区,当达到设定的阈值,在刷写磁盘之前,
后台线程会将缓冲区的数据划分成相应的分区。在每个分区中,后台线程按键进行内排序,在Map任务完成之前,
磁盘上存在多个已经分好区,并排好序的、大小和缓冲区一样的溢写文件,
这时溢写文件将被合并成一个已分区且已排序的输出文件。
由于溢写文件已经经过第一次排序,所以合并分区文件时只需要再做一次排序就可使输出文件整体有序。 .什么是inputsplit
InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割
,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定)。 .MR中使用了哪些接口?(或者是抽象类)
FileinputFormat、Mapper、Reducer、FileoutputFormat、Combiner、Partitioner等
---------------------
作者:QianShiK
来源:CSDN
原文:https://blog.csdn.net/QianShiK/article/details/81480854
版权声明:本文为博主原创文章,转载请附上博文链接! 这个业务中map输出value包含日志数据中每一行的上行流量 下行流量
* 因为一次要输出3个值 所以我们通常都是编写一个自定义的类型
*
* 自定义map reducer的输出值类型?
* a:需要编写一个java类封装每一行日志数据中的上行流量和下行流量
* b:还需要在a中封装根据上行流量和下行流量计算总流量
* c:因为a中的java类做的是map的输出值类型 所以它需要符合hadoop io类型
* LongWritable Text IntWritable都实现了WritableComparable接口
* 反推就是想编写hadoop io类型就需要实现WritableComparable接口
*
* WritableComparable接口又包括了
* Writable接口(write,readFields两个方法) 这个接口是负责这个对象的序列化和反序列化
* Comparable接口(compareTo一个方法) 这个接口是负责对象比较大小/排序使用的
* 所以两个接口中Writable接口才是标识是否属于hadoop io的类型
*
* 又以为map的输出值不需要排序 只需要序列化 所以我们在这需求需要编写的map输出自定义类型只
* 需要实现Writable接口即可 切片怎么切的:切片的数量不是越大或者越小写好,而是要根据每次计算的实际数据凉,自定义优化的切片大小来控制切片的数量
比如有一共300m,前128m以一个切片,如果129到300m的存储空间小可以直接是一个切片 NIO
:什么是NIO?
NIO是基于通道和缓存的非阻塞IO。 :IO 和 NIO的区别?
a:通道在IO中只是一个便于理解的虚拟概念 而在NIO中通道是一个实际的概念
b:在IO中最底层的传输数据是字节 而在NIO中最小的传输都是缓存
c:在IO中 虚拟的通道直接会和数据接触 在NIO中通道直接面对的不是数据 而是缓存
d:在IO中某一个通道通常都是单向的 在NIO中通道是双向的
e:IO和NIO针对数据传输内存使用的方式不同 IO是面向流的 NIO是面向缓存的 :NIO中的缓存
在NIO中传输数据 都是把数据先放入某个缓存中 再在某个通道中 按照缓存传输
在java中原有针对NIO的开发包 java.nio.* 在java.nio包下直接都是可以使用的缓存类:
ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
MappedByteBuffer
ShortBuffer :直接缓冲区和非直接缓冲区
a:系统内存(系统内核内存) 和 JVM内存(程序内存的区别)
b:IO是把数据先传入JVM内存 再从JVM内存复制到系统内存 组后从系统内存写入目的硬盘
NIO是把数据可以先传入硬盘内存区 再从硬盘内存区直接写入目的硬盘 buffer.allocate方法创建的缓冲区是在非直接缓冲区申请的内存
buffer.allocateDirect一旦使用 就是在直接缓冲区申请的内存 :通道
通道是为了替代cpu完成io操作 从而提升cpu的利用率
04-14 19:51