package com.hbase.mapreduce;
import java.io.IOException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author:FengZhen
* @create:2018年9月14日
*/
public class ImportFromFile extends Configured implements Tool{
private static String addr="HDP233,HDP232,HDP231";
private static String port="2181";
public static final String NAME = "ImportFromFile";
public enum Counters { LINES }
static class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private byte[] family = null;
private byte[] qualifier = null;
@Override
protected void setup(Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
String column = context.getConfiguration().get("conf.column");
byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
family = colkey[0];
if (colkey.length > 1) {
qualifier = colkey[1];
}
}
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
try {
String lineString = value.toString();
//行键是经过MD5散列之后随机生成的键值
byte[] rowkey = DigestUtils.md5(lineString);
Put put = new Put(rowkey);
//存储原始数据到给定的表中的一列
put.addColumn(family, qualifier, Bytes.toBytes(lineString));
context.write(new ImmutableBytesWritable(rowkey), put);
context.getCounter(Counters.LINES).increment(1L);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 使用Apache Commons CLI类解析命令行参数。
* @param args
* @return
*/
private static CommandLine parseArgs(String[] args) {
Options options = new Options();
Option option = new Option("t", "table", true, "table to import into -must exist");
option.setArgName("table-name");
option.setRequired(true);
options.addOption(option);
option = new Option("c", "column", true, "column to store row data into -must exit");
option.setArgName("family:qualifier");
option.setRequired(true);
options.addOption(option);
option = new Option("i", "input", true, "the directory or file to read from");
option.setArgName("path-in-HDFS");
option.setRequired(true);
options.addOption(option);
options.addOption("d", "debug", false, "switch on DEBUG log level");
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
e.printStackTrace();
System.err.println("ERROR: " + e.getMessage() + "\n");
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(NAME + " ", options, true);
System.exit(1);
}
return cmd;
}
public int run(String[] arg0) throws Exception {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum",addr);
configuration.set("hbase.zookeeper.property.clientPort", port);
//String[] otherArgs = new GenericOptionsParser(configuration, arg0).getRemainingArgs();
//CommandLine commandLine = parseArgs(arg0);
// String table = commandLine.getOptionValue("t");
// String input = commandLine.getOptionValue("i");
// String column = commandLine.getOptionValue("c");
String table = arg0[0];
String input = arg0[1];
String column = arg0[2];
configuration.set("conf.column", column);
Job job = Job.getInstance(configuration);
job.setJobName("ImportFromFile");
job.setJarByClass(ImportFromFile.class);
job.setMapperClass(ImportMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
//这是一个只包含map阶段的作业,框架会直接跳过reduce阶段
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(input));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
String[] params = new String[] {"test_table_mr", "hdfs://fz/data/fz/input/hbase", "data:info"};
int exitCode = ToolRunner.run(new ImportFromFile(), params);
System.exit(exitCode);
}
}