MapReduce输入数据源不仅可以是文件还可以是Hbase,从Hbase读取数据需要注意以下细节
1、Mapper类需要从TableMapper继承,并且实现函数
void map(ImmutableBytesWritable key, Result columns, Context context)
ImmutableBytesWritable key 实际上是Hbase表记录的
rowkeyResult columns 是hbase表记录的字段集合通过如下方式获取字段值,
其中RR表示列簇,
createDate是要获取的字段
String createDate = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("createDate")));
2、在驱动程序中需要做如下操作
A)、设置hbase地址和端口
Configuration conf = HBaseConfiguration.create();
//设置HbaseIP地址
conf.set("hbase.zookeeper.quorum", "192.168.10.100");
//设置Hbase port
conf.set("hbase.zookeeper.property.clientPort", String.valueOf(2181));
B)、设置扫描器及caching,这样避免MapReduce输出日志过多导致MR失败
Scan scan = new Scan();
String startRowKey = "ROM.01.";
scan.setStartRow(Bytes.toBytes(startRowKey));scan.setCaching(3000);
C)、设置要读取的hbase表、扫描器,Mapper类及输出key,输出valueTableMapReduceUtil.initTableMapperJob("report", scan, DetailMapper.class, Text.class, Text.class, job);
具体实现如下:
点击(此处)折叠或打开
- import org.apache.commons.cli.*;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.Scan;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
- import org.apache.hadoop.hbase.mapreduce.TableMapper;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- import java.io.IOException;
- public class DetailJob {
- private static final Log log = LogFactory.getLog(DetailJob.class);
- private static final String NAME = "DetailJob";
- public static class DetailMapper extends TableMapper<Text, Text> {
- private String key="";
- @Override
- protected void setup(Context context){
- key = context.getConfiguration().get("key");
- System.out.println("setup key:"+key);
- }
- @Override
- protected void map(ImmutableBytesWritable key, Result columns, Context context) throws IOException,
- InterruptedException {
- String emmcId = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("emmcid")));
- String cpuId = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("cpuid")));
- String osVersion = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("rom")));
- String createDate = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("createDate")));
- String ram = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("ram")));
- StringBuilder sb = new StringBuilder();
- sb.append(emmcId).append("\t");
- sb.append(cpuId ).append("\t");
- sb.append(osVersion).append("\t");
- sb.append(ram).append("\t");
- sb.append(createDate);
- context.write(new Text(cpuId), new Text(sb.toString()));
- }
- }
- public static class DetailReducer extends Reducer<Text, Text,
- NullWritable,
- Text> {
- @Override
- protected void reduce(Text key, Iterable<Text> values, Context context) throws
- IOException, InterruptedException {
- for (Text val : values) {
- context.write(NullWritable.get(), new Text(val.toString()));
- }
- }
- }
- private static CommandLine parseArgs(String[] args) {
- Options options = new Options();
- Option o = new Option("d", "table", true,
- "table to read from (must exist)");
- o.setArgName("table-name");
- o.setRequired(true);
- options.addOption(o);
- o = new Option("o", "output", true,
- "the directory to write to");
- o.setArgName("path-in-HDFS");
- o.setRequired(true);
- options.addOption(o);
- o = new Option("k", "key", true,
- "the directory to write to");
- o.setArgName("path-in-HDFS");
- o.setRequired(true);
- options.addOption(o);
- CommandLineParser parser = new PosixParser();
- CommandLine cmd = null;
- try {
- cmd = parser.parse(options, args);
- } catch (Exception e) {
- System.err.println("ERROR: " + e.getMessage() + "\n");
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(NAME + " ", options, true);
- System.exit(-1);
- }
- return cmd;
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = HBaseConfiguration.create();
- //设置HbaseIP地址
- conf.set("hbase.zookeeper.quorum", "192.168.10.100");
- //设置Hbase port
- conf.set("hbase.zookeeper.property.clientPort", String.valueOf(2181));
- String[] otherArgs =
- new GenericOptionsParser(conf, args).getRemainingArgs();
- CommandLine cmd = parseArgs(otherArgs);
- String date = cmd.getOptionValue("d");
- String output = cmd.getOptionValue("o");
- String key = cmd.getOptionValue("k");
- conf.set("key",key);
- //设置Hbase扫描数据的起始row
- // String date = cmd.getOptionValue("d");
- Scan scan = new Scan();
- if (date != null && date.trim().length() > 0) {
- //String nextDayStr = getNextDayStr(date);
- String startRowKey = "ROM.01." + date.replace("-", "");
- //String endRowKey = "ROM.01." + nextDayStr.replace("-", "");
- scan.setStartRow(Bytes.toBytes(startRowKey));
- // scan.setStopRow(Bytes.toBytes(endRowKey));
- }
- scan.setCaching(3000);
- Job job = new Job(conf, NAME);
- job.setJarByClass(DetailJob.class);
- //设置要读取的hbase表、扫描器,Mapper类及输出key,输出value
- TableMapReduceUtil.initTableMapperJob("report", scan, DetailMapper.class, Text.class, Text.class, job);
- job.setReducerClass(DetailReducer.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
- job.setNumReduceTasks(1);
- FileOutputFormat.setOutputPath(job, new Path(output));
- job.waitForCompletion(true);
- }