在hive查询中要限制查询输出条数, 可以用limit 关键词指定,如 select columnname1 from table1 limit 10; 这样hive将输出符合查询条件的10个记录,从根本上说, hive是hadoop提交作业的客户端,它使用antlr词法语法分析工具,对SQL进行分析优化后翻译成一系列MapReduce作业,向hadoop提交运行作业以得到结果.
看一条简单的SQL语句:
- > select deviceid from t_aa_pc_log where pt='2012-07-07-00' limit 1;
- Total MapReduce jobs = 1
- Launching Job 1 out of 1
- Number of reduce tasks is set to 0 since there's no reduce operator
- Starting Job = job_201205162059_1547550, Tracking URL = http://jt.dc.sh-wgq.sdo.com:50030/jobdetails.jsp?jobid=job_201205162059_1547550
- Kill Command = /home/hdfs/hadoop-current/bin/hadoop job -Dmapred.job.tracker=10.133.10.103:50020 -kill job_201205162059_1547550
- 2012-07-07 16:22:42,570 Stage-1 map = 0%, reduce = 0%
- 2012-07-07 16:22:48,628 Stage-1 map = 80%, reduce = 0%
- 2012-07-07 16:22:49,640 Stage-1 map = 100%, reduce = 0%
- 2012-07-07 16:22:50,654 Stage-1 map = 100%, reduce = 100%
- Ended Job = job_201205162059_1547550
- OK
- 0cf49387a23d9cec25da3d76d6988546
- Time taken: 13.499 seconds
- hive>
- > explain select deviceid from t_aa_pc_log where pt='2012-07-07-00' limit 1;
- OK
- STAGE DEPENDENCIES:
- Stage-1 is a root stage
- Stage-0 is a root stage
- STAGE PLANS:
- Stage: Stage-1
- Map Reduce
- Alias -> Map Operator Tree:
- t_aa_pc_log
- TableScan
- alias: t_aa_pc_log
- Filter Operator
- predicate:
- expr: (pt = '2012-07-07-00')
- type: boolean
- Select Operator
- expressions:
- expr: deviceid
- type: string
- outputColumnNames: _col0
- Limit
- File Output Operator
- compressed: false
- GlobalTableId: 0
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- Stage: Stage-0
- Fetch Operator
- limit: 1
- Time taken: 0.418 seconds
这样就构成了一个 Operator图,hive正是基于这些图关系来处理诸如limit, group by, join等操作. Operator 基类定义一个:
MapRunner会循环调用CombineHiveRecordReader的doNext方法读入行记录,直到doNext方法返回false, doNext方法中有一个重要的逻辑来控制记录读取是否结束
- public void map(Object key, Object value, OutputCollector output,
- Reporter reporter) throws IOException {
- if (oc == null) {
- oc = output;
- rp = reporter;
- mo.setOutputCollector(oc);
- mo.setReporter(rp);
- }
- // reset the execContext for each new row
- execContext.resetRow();
- try {
- if (mo.getDone()) {
- done = true;
- } else {
- // Since there is no concept of a group, we don't invoke
- // startGroup/endGroup for a mapper
- mo.process((Writable)value);
接下来再看看各Operator如何判断自己状态是否为执行完成:
- int childrenDone = 0;
- for (int i = 0; i < childOperatorsArray.length; i++) {
- Operator<? extends Serializable> o = childOperatorsArray[i];
- if (o.getDone()) {
- childrenDone++;
- } else {
- o.process(row, childOperatorsTag[i]);
- }
- }
- // if all children are done, this operator is also done
- if (childrenDone == childOperatorsArray.length) {
- setDone(true);
- }
每个Operator都判断自己的子Operator状态是否全部完成, 如果是则把自己的状态也设置成done=true.
最后再看LimitOperator的判断逻辑:
本文出自 “yyj0531” 博客,http://yaoyinjie.blog.51cto.com/3189782/923378
[Nice!感谢作者的分享,解决了我关于hive内核学习中的疑惑之处]