Mapper

 package cn.hbase.mapreduce.hb2hdfs;

 import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper; /**
*
* @author Tele
*
* 输出key 行键 输出out 读出的一行数据
*/ public class ReadFruitFromHbMapper extends TableMapper<ImmutableBytesWritable, Result> { @Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}

Reducer

 package cn.hbase.mapreduce.hb2hdfs;

 import java.io.IOException;

 import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; /**
*
* @author Tele
*
*/ public class WriteFruit2HdfsReducer extends Reducer<ImmutableBytesWritable, Result, NullWritable, Text> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Result> values, Context context)
throws IOException, InterruptedException {
for (Result result : values) {
CellScanner scanner = result.cellScanner();
while (scanner.advance()) {
Cell cell = scanner.current();
Text text = new Text();
// 封装数据
String row = Bytes.toString(CellUtil.cloneRow(cell)) + "\t";
String cf = Bytes.toString(CellUtil.cloneFamily(cell)) + "\t";
String cn = Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t";
String value = Bytes.toString(CellUtil.cloneValue(cell)) + "\t"; StringBuffer buffer = new StringBuffer();
buffer.append(row).append(cf).append(cn).append(value);
text.set(buffer.toString()); // 写出
context.write(NullWritable.get(), text);
} } }
}

Runner

 package cn.hbase.mapreduce.hb2hdfs;

 import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; /**
*
* @author Tele
*
*/ public class FruitRunner extends Configured implements Tool { public int run(String[] args) throws Exception { System.setProperty("HADOOP_USER_NAME", "tele");
// 实例化job
Job job = Job.getInstance(this.getConf()); // 设置jar
job.setJarByClass(FruitRunner.class); // 设置缓存行键
Scan scan = new Scan();
scan.setCaching(300);
      
// 组装mapper
TableMapReduceUtil.initTableMapperJob("fruit", scan, ReadFruitFromHbMapper.class, ImmutableBytesWritable.class,
Result.class, job);
// 组装reuder
job.setReducerClass(WriteFruit2HdfsReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job, new Path("/outputfruit")); // reduce个数
job.setNumReduceTasks(1); // 提交
return job.waitForCompletion(true) ? 0 : 1;
} public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
ToolRunner.run(conf, new FruitRunner(), args);
} }
05-11 08:19
查看更多