emmm我知道可以使用一个map和一个job写到多张表,但是,貌似没有找到别人像我这么做的,所以我就写出来试试
map1:
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.hbase.client.Put;
static class Mapper1 extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put>{
private ImmutableBytesWritable tbl1 = new ImmutableBytesWritable (Bytes.toBytes(表1));
@Override
public void map(LongWritable key ,Text value, Context context) throws IOException,InterruptedException{
if(逻辑){
byte[] rowKey = Bytes.toBytes(主键名);
Put p =new Put(rowKey);
p.addColumn(Bytes.toBytes(列族名),Bytes.toBytes(列名),Bytes.toBytes(值));
//具体各种操作之类的略
context.write(tbl1,p);
}
}
}
然后是map2,也是一样的:
static class Mapper2 extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put>{
private ImmutableBytesWritable tbl2 = new ImmutableBytesWritable (Bytes.toBytes(表2));
@Override
public void map(LongWritable key ,Text value, Context context) throws IOException,InterruptedException{
if(逻辑){
byte[] rowKey = Bytes.toBytes(主键名);
Put p =new Put(rowKey);
p.addColumn(Bytes.toBytes(列族名),Bytes.toBytes(列名),Bytes.toBytes(值));
//具体各种操作之类的略
context.write(tbl2,p);
}
}
}
Driver类:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
static class HBaseDriver extends Configured implements Tool{
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(getConf(),getClass().getSimpleName());
job.setJarByClass(getClass());
Configuration conf = job.getConfiguration();
conf.set("hbase.zookeeper.quorum", "master");
//hbase存储数据的地方,hbase-site.xml中配置
conf.set("hbase.rootdir", "hdfs://master:9000/hbase");
//conf的其他设置就略啦
Path path1=new Path(strings[0]);
Path path2 = new Path(strings[1]);
MultipleInputs.addInputPath(job,path1,TextInputFormat.class,Mapper1.class);
MultipleInputs.addInputPath(job,path2,TextInputFormat.class,Mapper2.class);
job.setOutputFormatClass(MultiTableOutputFormat.class);
job.setNumReduceTasks(0);
return job.waitForCompletion(true)? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode= ToolRunner.run(HBaseConfiguration.create(),new HBaseDriver(),args);
System.exit(exitCode);
}
}
job.setOutputFormatClass(MultiTableOutputFormat.class);这个设置能够输出到多张表中,将map的输出类型改成ImmutableBytesWritable行键和Put添加的内容。
而job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,“表名”)只能输入到某个表中。
当然,如果说表的名字想也写成动态的,可以啊。
在Driver类中,加入:
conf.set("table1",args[2]);
conf.set("table2",args[3]);
这两行的位置要靠前,在MultipleInputs之前
然后在Mapper类中加入如下:
public void setup(Context context){
tbl1 = new ImmutableBytesWritable(context.getConfiguration().get("table1"));
}
类似如此,其他的诸如列族名,列名,也可以通过这样的方式去设置。