一直有一个问题,今天调查了一下源码算是明白了。

===问题===

通过java api(如下代码所示)在创建表的时候,可以通过setMemStoreFlushSize函数来指定memstore的大小,

在集群配置文件中,也可以通过配置hbase.hregion.memstore.flush.size来指定memstore大小。

这两个地方指定的memestore的有什么区别和关联?

★参考代码

package api;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.BloomType; public class create_table_sample1 {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.1.80,192.168.1.81,192.168.1.82");
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin(); HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("t1"));
desc.setMemStoreFlushSize(2097152L); //2M(默认128M)
desc.setMaxFileSize(10485760L); //10M(默认10G) HColumnDescriptor family1 = new HColumnDescriptor(constants.COLUMN_FAMILY_DF.getBytes());
family1.setTimeToLive(2 * 60 * 60 * 24); //过期时间
family1.setMaxVersions(2); //版本数
desc.addFamily(family1);
HColumnDescriptor family2 = new HColumnDescriptor(constants.COLUMN_FAMILY_EX.getBytes());
family2.setTimeToLive(3 * 60 * 60 * 24); //过期时间
family2.setMinVersions(2); //最小版本数
family2.setMaxVersions(3); //版本数
family2.setBloomFilterType(BloomType.ROW); //布隆过滤方式
desc.addFamily(family2); admin.createTable(desc);
admin.close();
connection.close();
}
}

===解答===

源码位置:hbase-1.3.1\hbase-server\src\main\java\org\apache\hadoop\hbase\regionserver\

文件名:HRegion.java

函数名:setHTableSpecificConf

调用位置:HRegion类的构造函数

函数内容:

  void setHTableSpecificConf() {
if (this.htableDescriptor == null) return;
long flushSize = this.htableDescriptor.getMemStoreFlushSize(); if (flushSize <= 0) {
flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
}
this.memstoreFlushSize = flushSize;
this.blockingMemStoreSize = this.memstoreFlushSize *
conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
}

从上面的源代码中可以得到如下结论:

1、HRegion(每个Table会分为很多个HRegion分布在不同的HRegionServer中)对象在创建时,会初始化memstoreFlushSize。

2、它的计算首先是由Table决定的,即每个表可以设定自己的memstoreFlushSize。

通过关键字MEMSTORE_FLUSHSIZE来设定,或通过HTableDescriptor类中的setMemStoreFlushSize()方法来设定。

3、如果表中未设定,则通过集群参数hbase.hregion.memstore.flush.size来初始化。

4、如果集群参数也未配置的话,则默认为1024*1024*128L,即128M。

所以,可以为不同的表配置不同的MemStore大小。需要在创建表的时候指定。

如果表未单独配置,则采用集群的统一配置。默认128M。

===扩展===

上面setHTableSpecificConf的源代码中,还进行了blockingMemStoreSize的初期化,这个参数是什么呢?

从代码中可以看到,这个参数来源于集群配置项hbase.hregion.memstore.block.multiplier。这个参数与hbase.hregion.memstore.flush.size息息相关。

参数作用:

当一个HRegion中的MemStore的总大小(包含多个Store)超过阈值后,会出发flush请求。

该参数是个倍数,表示一个HRegion的MemStore的总大小最大可以是“hbase.hregion.memstore.flush.size”的几倍。

如果超过这个值,则会阻塞该HRegion的写请求,等待flush。

HRegion.java中的put方法。调用了checkResources()

  @Override
public void put(Put put) throws IOException {
checkReadOnly(); // Do a rough check that we have resources to accept a write. The check is
// 'rough' in that between the resource check and the call to obtain a
// read lock, resources may run out. For now, the thought is that this
// will be extremely rare; we'll deal with it when it happens.
checkResources();
startRegionOperation(Operation.PUT);
try {
// All edits for the given row (across all column families) must happen atomically.
doBatchMutate(put);
} finally {
closeRegionOperation(Operation.PUT);
}
}

checkResources()方法内容如下:

  /*
* Check if resources to support an update.
*
* We throw RegionTooBusyException if above memstore limit
* and expect client to retry using some kind of backoff
*/
private void checkResources() throws RegionTooBusyException {
// If catalog region, do not impose resource constraints or block updates.
if (this.getRegionInfo().isMetaRegion()) return; if (this.memstoreSize.get() > this.blockingMemStoreSize) {
blockedRequestsCount.increment();
requestFlush();
throw new RegionTooBusyException("Above memstore limit, " +
"regionName=" + (this.getRegionInfo() == null ? "unknown" :
this.getRegionInfo().getRegionNameAsString()) +
", server=" + (this.getRegionServerServices() == null ? "unknown" :
this.getRegionServerServices().getServerName()) +
", memstoreSize=" + memstoreSize.get() +
", blockingMemStoreSize=" + blockingMemStoreSize);
}
}

--END--

05-28 19:34