MapReduce输入数据源不仅可以是文件还可以是Hbase,从Hbase读取数据需要注意以下细节

1、Mapper类需要从TableMapper继承,并且实现函数
void map(ImmutableBytesWritable key, Result columns, Context context)
ImmutableBytesWritable key 实际上是Hbase表记录的
rowkeyResult columns 是hbase表记录的字段集合通过如下方式获取字段值,
其中RR表示列簇,
createDate是要获取的字段
String createDate = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("createDate")));
2、在驱动程序中需要做如下操作
A)、设置hbase地址和端口       
    Configuration conf = HBaseConfiguration.create();
        //设置HbaseIP地址
        conf.set("hbase.zookeeper.quorum", "192.168.10.100");
       //设置Hbase port
    conf.set("hbase.zookeeper.property.clientPort", String.valueOf(2181));
B)、设置扫描器及caching,这样避免MapReduce输出日志过多导致MR失败
Scan scan = new Scan();
String startRowKey = "ROM.01.";
scan.setStartRow(Bytes.toBytes(startRowKey));scan.setCaching(3000);
C)、设置要读取的hbase表、扫描器,Mapper类及输出key,输出valueTableMapReduceUtil.initTableMapperJob("report", scan, DetailMapper.class, Text.class, Text.class, job);

具体实现如下:


点击(此处)折叠或打开

  1. import org.apache.commons.cli.*;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.apache.commons.logging.Log;
  4. import org.apache.commons.logging.LogFactory;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.hbase.HBaseConfiguration;
  8. import org.apache.hadoop.hbase.client.Result;
  9. import org.apache.hadoop.hbase.client.Scan;
  10. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  11. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  12. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  13. import org.apache.hadoop.hbase.util.Bytes;
  14. import org.apache.hadoop.io.NullWritable;
  15. import org.apache.hadoop.io.Text;
  16. import org.apache.hadoop.mapreduce.Job;
  17. import org.apache.hadoop.mapreduce.Reducer;
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  19. import org.apache.hadoop.util.GenericOptionsParser;

  20. import java.io.IOException;


  21. public class DetailJob {
  22.     private static final Log log = LogFactory.getLog(DetailJob.class);
  23.     private static final String NAME = "DetailJob";

  24.     public static class DetailMapper extends TableMapper<Text, Text> {
  25.         private String key="";
  26.         @Override
  27.         protected void setup(Context context){
  28.             key = context.getConfiguration().get("key");
  29.             System.out.println("setup key:"+key);
  30.         }
  31.         @Override
  32.         protected void map(ImmutableBytesWritable key, Result columns, Context context) throws IOException,
  33.                 InterruptedException {
  34.             String emmcId = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("emmcid")));
  35.             String cpuId = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("cpuid")));
  36.             String osVersion = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("rom")));
  37.             String createDate = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("createDate")));
  38.             String ram = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("ram")));

  39.             StringBuilder sb = new StringBuilder();
  40.             sb.append(emmcId).append("\t");
  41.             sb.append(cpuId ).append("\t");
  42.             sb.append(osVersion).append("\t");
  43.             sb.append(ram).append("\t");
  44.             sb.append(createDate);
  45.             context.write(new Text(cpuId), new Text(sb.toString()));
  46.         }
  47.     }

  48.     public static class DetailReducer extends Reducer<Text, Text,
  49.             NullWritable,
  50.             Text> {
  51.         @Override
  52.         protected void reduce(Text key, Iterable<Text> values, Context context) throws
  53.                 IOException, InterruptedException {
  54.             for (Text val : values) {
  55.                 context.write(NullWritable.get(), new Text(val.toString()));
  56.             }
  57.         }
  58.     }

  59.     private static CommandLine parseArgs(String[] args) {
  60.         Options options = new Options();
  61.         Option o = new Option("d", "table", true,
  62.                 "table to read from (must exist)");
  63.         o.setArgName("table-name");
  64.         o.setRequired(true);
  65.         options.addOption(o);

  66.         o = new Option("o", "output", true,
  67.                 "the directory to write to");
  68.         o.setArgName("path-in-HDFS");
  69.         o.setRequired(true);
  70.         options.addOption(o);

  71.         o = new Option("k", "key", true,
  72.                 "the directory to write to");
  73.         o.setArgName("path-in-HDFS");
  74.         o.setRequired(true);
  75.         options.addOption(o);

  76.         CommandLineParser parser = new PosixParser();
  77.         CommandLine cmd = null;
  78.         try {
  79.             cmd = parser.parse(options, args);
  80.         } catch (Exception e) {
  81.             System.err.println("ERROR: " + e.getMessage() + "\n");
  82.             HelpFormatter formatter = new HelpFormatter();
  83.             formatter.printHelp(NAME + " ", options, true);
  84.             System.exit(-1);
  85.         }
  86.         return cmd;
  87.     }

  88.     public static void main(String[] args) throws Exception {
  89.         Configuration conf = HBaseConfiguration.create();
  90.         //设置HbaseIP地址
  91.         conf.set("hbase.zookeeper.quorum", "192.168.10.100");
  92.        //设置Hbase port
  93.         conf.set("hbase.zookeeper.property.clientPort", String.valueOf(2181));

  94.         String[] otherArgs =
  95.                 new GenericOptionsParser(conf, args).getRemainingArgs();
  96.         CommandLine cmd = parseArgs(otherArgs);
  97.         String date = cmd.getOptionValue("d");
  98.         String output = cmd.getOptionValue("o");

  99.         String key = cmd.getOptionValue("k");
  100.         conf.set("key",key);

  101.         //设置Hbase扫描数据的起始row
  102.        // String date = cmd.getOptionValue("d");
  103.         Scan scan = new Scan();
  104.         if (date != null && date.trim().length() > 0) {
  105.             //String nextDayStr = getNextDayStr(date);
  106.             String startRowKey = "ROM.01." + date.replace("-", "");
  107.             //String endRowKey = "ROM.01." + nextDayStr.replace("-", "");
  108.             scan.setStartRow(Bytes.toBytes(startRowKey));
  109.            // scan.setStopRow(Bytes.toBytes(endRowKey));
  110.         }
  111.         scan.setCaching(3000);

  112.         Job job = new Job(conf, NAME);
  113.         job.setJarByClass(DetailJob.class);
  114.         //设置要读取的hbase表、扫描器,Mapper类及输出key,输出value
  115.         TableMapReduceUtil.initTableMapperJob("report", scan, DetailMapper.class, Text.class, Text.class, job);
  116.         job.setReducerClass(DetailReducer.class);
  117.         job.setOutputKeyClass(NullWritable.class);
  118.         job.setOutputValueClass(Text.class);
  119.         job.setNumReduceTasks(1);

  120.         FileOutputFormat.setOutputPath(job, new Path(output));
  121.         job.waitForCompletion(true);
  122.     }


09-25 16:38