这是一个面试经常面的问题,很不幸,在没有准备的时候,我面到了这个题目,反思了下,将这部分的内容进行总结,给大家一点分享。

        hive其实是基于hadoop的数据库管理工具,底层是基于MapReduce实现的,用户写的hivesql最终转换成MapReduce的任务运行在hadoop上,不过MapReduce会因为磁盘IO的问题会运行较慢,因此在Hive sql进行优化,就需要考虑到MapReduce的生命周期,在各个时间节点上进行调优,从而实现Hive sql的整体优化。

        如果要从MapReduce角度来分析,就需要从Map,Reduce,join,参数设置的角度来分析。

Map阶段

  • 尽早使用where条件:提前把不需要计算的数据过滤掉,而不是在进行复杂操作后再集中过滤。
  • 使用分区裁剪:Hive不同分区是按照不同目录存放的,指定分区可以访问特定的目录,减少数据量。
  • 使用列裁剪:尽量不要使用select * from ...,指定特定列会只扫描特定列而不扫描全表,提高执行速度,同时select * 会让优化器无法完成索引覆盖扫描这类优化,会影响优化器对执行计划的选择,也会增加网络带宽消耗,更会带来额外的 I/O,内存和 CPU 消耗。
  • 相似任务尽量使用多路输出:相同的计算只需要计算一次,减少重复计算,同时也能减少reduce task
  • 减少case when中的when:表中的文件都需要走一遍when流程,when越多效率就越低,而且在reduce阶段最好做一遍合并压缩操作,否则可能会产生很多文件。

reduce 阶段

  • 使用 group by 代替 distinct:因为distinct会把所有任务都分配到一个reduce task中。
  • 使用 sort by + distribute by代替 order by:order by 和 distinct 类似,在reduce阶段都会把所有的任务集中到一个reduce task中计算,使用 sort by 和 distribute by 后MR会根据情况启动多个reduce来排序,不过记得一定要加distribute by,否则map后的数据会随机分配到reducer中,不能保证全局有序。
  • 尽量使用union all代替union:union去重,有shuffle,union all不去重,无shuffle,shuffle会造成数据在集群中传输,并且伴随着读和写,很影响任务的执行性能。如果要去重,可以最后用group by。

join task过程优化        

  • 避免使用笛卡尔积:尽量有关联键,hive本身不支持笛卡尔积,需要先用set hive.mapred.mode=nonstrict设为非strict模式。
  • 多表join查询时,小表在前,大表在后,Hive在解析带join的SQL语句时,会默认将最后一个表作为probe table(大表),将前面的表作为build table(小表)并试图将它们读进内存(是否读入内存可以配置)。如果表顺序写反,probe table在前面,有引发OOM的风险。
  • 小表超出内存限制,采用多次join:build table没有小到可以直接读如内存,但是相比probe table又很小,可以将build table拆成几个表,分别join。
  • 小表join大表,尽量使用map join:将build table和probe table在map端直接完成join过程,没有了reduce,效率高很多。
  • 多表join时如果允许尽量使用相同的key:这样会将多个join合并为一个MR job来处理。
  • join时保证关联键类型相同:如果不同时也适用cast进行转换,否者会导致另外一个类型的key分配到一个reducer上。
  • join的时候如果关联健某一类值较多先过滤:比如空值、0等,因为这会导致某一个reducer的计算量变得很大,可以单独处理倾斜key。
  • left semi join 代替join判断in和exists:hive0.13前不支持在where 中使用in嵌套查询是否exists,使用left semi join代替join。

参数配置上的优化

        小表join时尽量开启map join

set hive.auto.convert.join=true;  -- 版本0.11.0之后,默认是开启状态的,但时不时会把这个配置关闭,所以最好还是手动配置一下
set hive.mapjoin.smalltable.filesize=25000000;  -- 默认是25Mb开启mapjoin,对于稍微超过这大小的,可以适当调大,但不能太大

调整map数

        如果输入文件是少量大文件,就减少mapper数;如果输入文件是大量大文件,就增大mapper数;如果是大量的小文件就先合并小文件。

set mapred.min.split.size=10000;  -- 最小分片大小
set mapred.max.split.size=10000000;  -- 最大分片大小
set mapred.map.tasks=100;  -- 设置map task任务数
map任务数计算规则:map_num = MIN(split_num, MAX(default_num, mapred.map.tasks)),

合并小文件        

set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;  -- 输入阶段合并小文件
set hive.merge.mapredfiles=true;  -- 输出阶段小文件合并
set hive.merge.mapfiles=true;  -- 开启map端合并小文件,默认开启
set hive.merge.mapredfiles=true;  -- 开启reduce端合并小文件
set hive.merge.smallfiles.avgsize=16000000;  -- 平均文件大小,默认16M,满足条件则自动合并,只有在开启merge.mapfiles和merge.mapredfiles两个开关才有效

启用压缩

set hive.exec.compress.intermediate=true;  -- 开启输入压缩
set hive.exec.compress.output=true;  -- 开启输出压缩
set sethive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;  -- 使用Snappy压缩
set mapred.output.compreession.codec=org.apache.hadoop.io.compress.GzipCodec;  -- 使用Gzip压缩
set hive.intermediate.compression.type=BLOCK;  -- 配置压缩对象 快或者记录

分桶设置

set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;

设置合适的数据存储格式

hive默认的存储格式是TextFile,但是这种文件格式不使用压缩,会占用比较大空间,目前支持的存储格式有SequenceFile、RCFile、Avro、ORC、Parquet,这些存储格式基本都会采用压缩方式,而且是列式存储,如果指定存储orc模式;

ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
并行化执行

并行化执行

每个查询被hive转化成多个阶段,有些阶段关联性不大,则可以并行化执行,减少执行时间,主要针对uoion 操作

set hive.exec.parallel=true;  -- 开启并行模式
set hive.exec.parallel.thread.numbe=8;  -- 设置并行执行的线程数

本地化执行

本地模式主要针对数据量小,操作不复杂的SQL。

set hive.exec.mode.local.auto;  -- 开启本地执行模模式
需要满足的条件:
job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)
job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)
job的reduce数必须为0或者1

使用严格模式

严格模式主要是防范用户的不规范操作造成集群压力过大,甚至是不可用的情况,只对三种情况起左右,分别是查询分区表是不指定分区;两表join时产生笛卡尔积;使用了order by 排序但是没有limit关键字。

set hive.mapred.mode=strict;  -- 开启严格模式

map端预聚合

预聚合的配置项是
set hive.map.aggr=true;  -- group by时,如果先起一个combiner在map端做部分预聚合,使用这个配置项可以有效减少shuffle数据量,默认值true
set hive.groupby.mapaggr.checkinterval=100000;  -- 也可以设置map端预聚合的行数阈值,超过该值就会分拆job,默认值100000

倾斜均衡配置项

set hive.groupby.skewindata=false;  -- group by时如果某些key对应的数据量过大,就会发生数据倾斜。Hive自带了一个均衡数据倾斜的配置项,默认值false

动态分区配置

set hive.exec.dynamic.partition=false;  -- 是否开启动态分区功能,默认false关闭
set hive.exec.dynamic.partition.mode=strict;  -- 动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区
set hive.exec.max.dynamic.partitions.pernode=100;  -- 在每个执行MR的节点上,最大可以创建多少个动态分区,根据实际的数据来设定,比如hour必须大于等于24,day必须大于365
set hive.exec.max.dynamic.partitions=1000;  -- 在所有执行MR的节点上,最大一共可以创建多少个动态分区
set hive.exec.max.created.files=100000;  -- 整个MR Job中,最大可以创建多少个HDFS文件
set hive.error.on.empty.partition=false;  -- 当有空分区生成时,是否抛出异常

JVM重用

set mapred.job.reuse.jvm.num.tasks=10;  -- 在MR job中,默认是每执行一个task就启动一个JVM。如果task非常小而碎,那么JVM启动和关闭的耗时就会很长。可以通过调节参数这个参数来重用。例如将这个参数设成5,就代表同一个MR job中顺序执行的10个task可以重复使用一个JVM,减少启动和关闭的开销。但它对不同MR job中的task无效。
08-27 02:46