分析:
1、由于是任意列 任意表 任意路径,我们很容易想到是参数传入,参数传入后怎么去获得参数,根据我们以往的经验就是通过args[]来获取,但是在mapper或者是reducer中,我们不能直接将参数传入,因为map 和reduce是通过反射机制来创建的,对于传入的参数我们不能直接使用;我们发现在map和reduce有一个参数context,此类中包含很多的信息,例如configuration,并且configuration 还有set()方法,因此 我们可以将参数传给conf,然后由context拿到conf,进而拿到参数/
2、表头在第一行,有且只有一行,因此可以将表头的信息写入mapper 的setup()方法,因为它只进行一次操作
代码:
定义最后的程序运行为 hadoop jar **.jar t1 /tt "f1:c1|f1:c2|f2:c3"
对"f1:c1|f1:c2|f2:c3"进行拆分,首先按照"|"进行拆分,在java中 split("\\|"),因为|是转义字符,然后对每个f:c 再按照":"拆分,此时我们就拿到了单独的列族和列
定义自己的mapper函数(由于我们只是将数据读取出来,所以不用写reducer)
public static class MyMapper extends TableMapper<Text, Text> { @Override
protected void setup(Context context) throws IOException,
InterruptedException {
String familyscolumns = context.getConfiguration().get(
"familys:columns");
String[] familycolumns = familyscolumns.split("\\|");
String familycolumn = "";
for (String fc : familycolumns) {
familycolumn += fc + "\t";
}
// 在第一行增加header 行健 列族:列
context.write(new Text("rowkey"), new Text(familycolumn));
} Text k2 = new Text();
Text v2 = new Text(); @Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
k2.set(key.get());
String familyscolumns = context.getConfiguration().get(
"familys:columns");
String[] familycolumns = familyscolumns.split("\\|");
String familycolumn="";
Cell columnLatestCell1=null;
for (String string : familycolumns) {
String[] fc = string.split(":");
columnLatestCell1 = value.getColumnLatestCell(
fc[0].getBytes(), fc[1].getBytes());
if (columnLatestCell1 != null) {
familycolumn+=new String(columnLatestCell1.getValue())+"\t";
} else{
familycolumn+="\t";
} }
v2.set(new String(familycolumn));
context.write(k2, v2);
} }
然后在客户端提交job
并把参数写入conf
Configuration conf = HBaseConfiguration.create();
conf.set("table", args[0]);
conf.set("hdfsPath", args[1]);
conf.set("familys:columns", args[2]);
用mr实现将hadoop中的数据导入到hbase(我之前一直纠结是写在map还是reduce,其实无所谓啦)
public static void main(String[] args) throws Exception {
PropertyConfigurator.configure(ClassLoader
.getSystemResource("log4j.properties"));
final Configuration configuration = new Configuration();
// 设置zookeeper
configuration.set("hbase.zookeeper.quorum", "hadoop");
// 设置hbase表名称
configuration.set(TableOutputFormat.OUTPUT_TABLE, "t2");
// 将该值改大,防止hbase超时退出
configuration.set("dfs.socket.timeout", "180000"); final Job job = Job.getInstance(configuration,
MR2HB.class.getSimpleName());
TableMapReduceUtil.addDependencyJars(job);
job.setJarByClass(MR2HB.class); job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0);
// 设置map的输出,不设置reduce的输出类型
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class);
// 不再设置输出路径,而是设置输出格式类型
job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.setInputPaths(job, "hdfs://hadoop:9000/part-m-00000"); job.waitForCompletion(true); } public static class MyMapper extends
Mapper<LongWritable, Text, NullWritable, Put> {
@Override
protected void map(LongWritable key, Text value,
org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
Log.info(split[0]); Put put = new Put(Bytes.toBytes(split[0]));
put.add(Bytes.toBytes("f1"), Bytes.toBytes("age"),
Bytes.toBytes(split[1]));
if (split.length == 4) {
put.add(Bytes.toBytes("f2"), Bytes.toBytes("address"),
Bytes.toBytes(split[2]));
put.add(Bytes.toBytes("f1"), Bytes.toBytes("name"),
Bytes.toBytes(split[3]));
}
context.write(NullWritable.get(), put);
} }