产生背景:sqoop抽取oracle数据到hive表时,只能写入到固定分区(--hive-partition-key #hive分区字段 --hive-partition-value #hive分区值)。于是先把数据抽取到一张增量表,然后从增量表动态写入分区表。

set hive.exec.dynamic.partition.mode = true;  --使用动态分区时,设置为ture。
set hive.exec.dynamic.partition.mode = nonstrict;  --动态分区模式,默认值:strict,表示必须指定一个分区为静态分区;nonstrict模式表示允许所有的分区字段都可以使用动态分区。一般需要设置为nonstrict。
set hive.exec.max.dynamic.partitions.pernode =10;  --在每个执行MR的节点上,最多可以创建多少个动态分区,默认值:100。
set hive.exec.max.dynamic.partitions =1000;  --在所有执行MR的节点上,最多一共可以创建多少个动态分区,默认值:1000。
set hive.exec.max.created.files = 100000;  --整个MR Job中最多可以创建多少个HDFS文件,默认值:100000。
set hive.error.on.empty.partition = false;  --当有空分区产生时,是否抛出异常,默认值:false。

  Hive文件产生大量小文件的原因:

    一是文件本身的原因:小文件多,以及文件的大小;

    二是使用动态分区,可能会导致产生大量分区,从而产生很多小文件,也会导致产生很多Mapper;

    三是Reduce数量较多,Hive SQL输出文件的数量和Reduce的个数是一样的。

  小文件带来的影响:

    文件的数量和大小决定Mapper任务的数量,小文件越多,Mapper任务越多,每一个Mapper都会启动一个JVM来运行,所以这些任务的初始化和执行会花费大量的资源,严重影响性能。

    在NameNode中每个文件大约占150字节,小文件多,会严重影响NameNode性能。

  解决小文件问题:

    如果动态分区数量不可预测,最好不用。如果用,最好使用distributed by分区字段,这样会对字段进行一个hash操作,把相同的分区给同一个Reduce处理;

    减少Reduce数量;

    进行以一些参数调整。

控制Mapper的数量:

  决定Mapper的数量的因素有:输入文件的个数,输入文件的大小、集群设置的文件块大小。

    例如:输入目录下有1个800M的文件,hadoop会将文件分成7个文件(6*128M + 1*32M),从而产生7个Mapper数;

    例如:输入目录下有5个文件,分别为:15M、20M、50M、100M、150M,那么hadoop会分隔成6个文件(15M、20M、50M、100M、128M、22M),从而产生6个Mapper。

  可以通过设置如下参数,让Map在执行之前合并小文件,从而减少Mapper数量:

set mapred.max.split.size=100000000;   -- 决定每个map处理的最大的文件大小,单位为B
set mapred.min.split.size.per.node=100000000;   -- 节点中可以处理的最小的文件大小
set mapred.min.split.size.per.rack=100000000;   -- 机架中可以处理的最小的文件大小
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;  ---实现map中的数据合并需要设置下面的参数,集群默认就是这个格式

  

  控制Mapper的整体原则:

    大数据量要利用合适的map数,单个map要处理合适的数据量;

    map占用资源要合并小文件,map不足要把大文件拆成小文件。

控制Reduce的数量:

  Reduce的个数会极大影响任务的执行效率

  • Hive自己确定reduce数

    不指定Reduce的个数的情况下,Hive会猜测确定一个Reduce个数,由下面两个参数决定:

    1、hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)

    2、hive.exec.reducers.max(每个任务最大的reduce数,默认为999)

    Reduce的个数N=min(参数2,输入总数据量/参数1),例如:如果Reduce的输入(map的输出)总大小不超过1G,那么只有一个Reduce任务。

  • 手动调整reduce数

    Hive官网:

In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number>
In order to set a constant number of reducers: set mapreduce.job.reduces=<number>

  Notes:动态分区采坑

    在使用动态分区的时候,如果已知数据会分成n个分区,SQL运行的时候创建了m个Mapper,则这个SQL产生m * n个文件。如果这个数值大于设置的创建文件的总数(hive.exec.max.created.files),默认值100000个,就会出现异常。

    在未知动态分区数时,可以使用distribute by 分区字段,将分区字段内容相同的数据放到同一个reduce,当然也可以使用distribute by rand()将数据随记分配给reduce,这样可以使每个reduce处理的数据大体相同。

  • 和map一样,启动和初始化reduce会消耗时间和资源,有多少reduce就会产生多少个文件
  • 以下情况下,会只有一个reduce:
    • 没有group by的汇总,如把select dt,count(1) from test where dt = '2019-12-12' group by dt;,写成select count(1) from test where dt = '2019-12-12';
    • 用了order by;
    • 有笛卡尔积;

  控制Reduce的整体原则:

    使大数据量利用合适的reduce数;

    使单个reduce任务处理合适的数据量。

  

05-11 16:16