1. 使用“连接池”
如果每次和Hbase交互时都去新建连接的话,显然是低效率的,HBase也提供类连接池相关的API。
1.1. HTablePool
早期的API中使用它,但很不幸,现在它已经过时了。在次不再描述。
1.2. HConnection
取代HTablePool的就是现在的HConnection,可以通过它拿到几乎所有关于HBase的相关操作对象。
private static HConnection connection = null;
private static Configuration conf =null; static{
try {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.property.clientPort", "");
conf.set("hbase.zookeeper.quorum", "Hadoop-master01,Hadoop-slave01,Hadoop-slave02"); connection = HConnectionManager.createConnection(getHBaseConfiguration());
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
}
}
2. 读 优化
2.1. 根据rowkey
如果本操作中只有一个rowkey的话,大可以使用下边的方式(单个读):
byte[] rowkey = new byte[]{......};
Get get = new Get(rowkey);
Result result = destTable.get(get);
若有多个rowkey的话,可以使用如下方式(批量读):
List<byte[]> rowList = new ArrayList<byte[]>();
List<Get> gets = new ArrayList<Get>();
for(byte[] row:rowList){
gets.add(new Get(row:));
}
Result[] results = destTable.get(gets);
2.2. 使用Scan
Scan scan = new Scan();
ResultScanner resultScanner = srcTable.getScanner(scan);
可以通过设置hbase.client.scanner.caching参数来设置resultScanner从服务器一次抓取的数据条数。默认是一次一条,这样可以大大的增加结果集游标移动的效率(resultScanner.next())。
设置这个参数的方法有三个:
- HBase的conf配置文件hdfs-site.xml里可以配置
- 表的对象:hTable.setScannerCaching(10000);
- 扫面器对象:scan.setCaching(10000);
另外,还可以通过:
scan.addColumn(Bytes.toBytes("sm"), Bytes.toBytes("ip"));
设置扫描的列,减少不必要的网络流量,提升读表效率。
3. 写 优化
写数据的操作中每条提交一个put,其中包含了rowkey,还有对于的一列或多列值。
3.1. 写入单条数据
byte[] row = Bytes.toBytes(...);
Put put = new Put(row);
put.add(Bytes.toBytes(...), Bytes.toBytes(...), Bytes.toBytes(...)); table.put(put);
table.flushCommits();
其中,table.put(put)是把数据提交到HDFS里,执行了table.flushCommits()之后,将会把数据提交到HBase中。
3.2. 写入多条数据
在写入多条数据时,就会涉及到数据提交和缓存的问题,具体如下:
- 客户端维护缓存
使用HTable.setAutoFlush(true)设置客户端写入数据时自动维护缓存,当数据达到缓存上限时自动提交数据,这个参数默认是开启的。设置客户端自行维护缓存时,可更具需求来设置缓存的大小,HTable.setWriteBufferSize(writeBufferSize)。
但是在实际开发中,并不提倡这种方法。原因是每次table.put(put)去连接hdfs的时间开销是频繁的,不适合大吞吐量的批量写入。
- 手动维护缓存
可以把要写入的数据先放入本地内存中,然后使用table.put(List<Put>)来提交数据。这样来减少客户端和集群的交互次数,提高传输的吞吐量。
List<Put> puts = new ArrayList<Put>();
for(int i=; i<; i++){
byte[] rowkey = Bytes.toBytes(RandomStringUtils.random(,"ABCDESSSSS"));
byte[] value = Bytes.toBytes(RandomStringUtils.random(,"IOJKJHHJNNBGHIKKLM<NH"));
Put put = new Put(rowkey);
put.add(Bytes.toBytes(FAMILY_CF), Bytes.toBytes("value"), value);
puts.add(put);
if(i%==){
table.put(puts);
table.flushCommits();
puts.clear();
}
}
3.3. 自增列
destTable.incrementColumnValue(rowkey, Bytes.toBytes(FAMILY_CF), Bytes.toBytes("testIncrement"),Long.parseLong("") ,true);
往testIncrement列自增1.在批处理系统中,这种使用方法需要慎用,它每次执行都会提交数据,不能实现这一列的批量提交。