mapreduce实现读取hbase表数据保存到hdfs
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Hbase2Hdfs {
public static void main(String[] args) throws Exception {
Configuration config = HBaseConfiguration.create();
Job job = Job.getInstance(config, "Hbase2HdfsMrTest");
job.setJarByClass(Hbase2Hdfs.class);
String hbaseTableName = "wordcount";
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(hbaseTableName, scan, MyMapper.class, Text.class, Text.class, job);
FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile"));
boolean b = job.waitForCompletion(true);
if (b) {
System.out.println("hbase to hdfs ok");
}
}
/**
* 只有mapper,没有reducer,将mapper的输出直接保存到文件中
* @author caoxiangqian
*
*/
public static class MyMapper extends TableMapper<Text, Text> {
@Override
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
throws IOException, InterruptedException {
for (Cell cell : value.rawCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String row = Bytes.toString(CellUtil.cloneRow(cell));
String rowValue = null;
if ("count".equals(qualifier)) {
rowValue = Bytes.toLong(CellUtil.cloneValue(cell)) + "";
} else {
rowValue = Bytes.toString(CellUtil.cloneValue(cell));
}
System.out.println(
"family: " + family + ", qualifier:" + qualifier + ", rowKey:" + row + ", value: " + rowValue);
context.write(new Text(
"rowKey=" + row + ",family=" + family + ",qualifier=" + qualifier + ",value=" + rowValue),
new Text(""));
}
}
}
}
[hadoop@hadoop001 mrtest]$ hdfs dfs -text /tmp/mr/mySummaryFile/*
rowKey=a,family=cf,qualifier=count,value=4
rowKey=a,family=cf,qualifier=word,value=a
rowKey=am,family=cf,qualifier=count,value=10
rowKey=am,family=cf,qualifier=word,value=am
rowKey=boy,family=cf,qualifier=count,value=2
rowKey=boy,family=cf,qualifier=word,value=boy
rowKey=caoxiangqian,family=cf,qualifier=count,value=1
rowKey=caoxiangqian,family=cf,qualifier=word,value=caoxiangqian
rowKey=go,family=cf,qualifier=count,value=6918
rowKey=go,family=cf,qualifier=word,value=go
rowKey=good,family=cf,qualifier=count,value=3
rowKey=good,family=cf,qualifier=word,value=good
rowKey=haha,family=cf,qualifier=count,value=3624
rowKey=haha,family=cf,qualifier=word,value=haha
rowKey=hello,family=cf,qualifier=count,value=7398
rowKey=hello,family=cf,qualifier=word,value=hello
- 如果只有mapper可能无法控制输出的文件数量,可以加一个reducer,并设置reducer的数量来控制输出文件的数量