一、所有的hbase的客户端操作都是通过 Connection来进行的,从创建 Connection入口进行分析。
ConnectionFactory.createConnection(HBaseConfiguration.create())
HConnectionImplementation为Connection的实现类。
// ConnectionFactory.java
static Connection createConnection(final Configuration conf, final boolean managed,
final ExecutorService pool, final User user)
throws IOException {
String className = conf.get(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
ConnectionManager.HConnectionImplementation.class.getName());
Class<?> clazz = null;
try {
clazz = Class.forName(className);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
try {
// Default HCM#HCI is not accessible; make it so before invoking.
Constructor<?> constructor =
clazz.getDeclaredConstructor(Configuration.class,
boolean.class, ExecutorService.class, User.class);
constructor.setAccessible(true);
return (Connection) constructor.newInstance(conf, managed, pool, user);
} catch (Exception e) {
throw new IOException(e);
}
}
HConnectionImplementation所调用的构造方法。通过 RpcClientFactory.createClient()创建了RpcClient。
//ConnectionManager.java
HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool, User user) throws IOException {
this(conf);
this.user = user;
this.batchPool = pool;
this.managed = managed;
this.registry = this.setupRegistry();
this.retrieveClusterId();
//将RpcClient绑定到了 Connection上
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
boolean shouldListen = conf.getBoolean("hbase.status.published", false);
Class listenerClass = conf.getClass("hbase.status.listener.class", ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, Listener.class);
if(shouldListen) {
if(listenerClass == null) {
LOG.warn("hbase.status.published is true, but hbase.status.listener.class is not set - not listening status");
} else {
this.clusterStatusListener = new ClusterStatusListener(new DeadServerHandler() {
public void newDead(ServerName sn) {
HConnectionImplementation.this.clearCaches(sn);
HConnectionImplementation.this.rpcClient.cancelConnections(sn);
}
}, conf, listenerClass);
}
}
}
RpcClient的实现类为RpcClientImpl(还有一个异步的RpcClient->AsyncRpcClient)
//RpcClientFactory.java
public static RpcClient createClient(Configuration conf, String clusterId,
SocketAddress localAddr, MetricsConnection metrics) {
String rpcClientClass =
conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
RpcClientImpl.class.getName());
return ReflectionUtils.instantiateWithCustomCtor(
rpcClientClass,
new Class[] { Configuration.class, String.class, SocketAddress.class,
MetricsConnection.class },
new Object[] { conf, clusterId, localAddr, metrics }
);
}
二、接下来从具体的操作的分析怎么使用 RpcClient,列子为创建表的方法
//Admin.java
void createTable(HTableDescriptor desc) throws IOException;
上面是org.apache.hadoop.hbase.client.Admin创建表的方法
//HBaseAdmin.java
private Future<Void> createTableAsyncV2(final HTableDescriptor desc, final byte[][] splitKeys)
throws IOException {
if (desc.getTableName() == null) {
throw new IllegalArgumentException("TableName cannot be null");
}
if (splitKeys != null && splitKeys.length > 0) {
Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
// Verify there are no duplicate split keys
byte[] lastKey = null;
for (byte[] splitKey : splitKeys) {
if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
throw new IllegalArgumentException(
"Empty split key must not be passed in the split keys.");
}
if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
throw new IllegalArgumentException("All split keys must be unique, " +
"found duplicate: " + Bytes.toStringBinary(splitKey) +
", " + Bytes.toStringBinary(lastKey));
}
lastKey = splitKey;
}
}
CreateTableResponse response = executeCallable(
new MasterCallable<CreateTableResponse>(getConnection()) {
@Override
public CreateTableResponse call(int callTimeout) throws ServiceException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(desc.getTableName());
CreateTableRequest request = RequestConverter.buildCreateTableRequest(
desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
//此处的master为 MasterCallable的connection调用getKeepAliveMasterService绑定的stub
return master.createTable(controller, request);
}
});
return new CreateTableFuture(this, desc, splitKeys, response);
}
上面的具体的创建表的实现,此方法是异步的。
通过ConnectionManager创建master的stub, HConnectionImplementation为 ConnectionManager的内部类。
// ConnectionManager.java
Object makeStub() throws IOException {
// The lock must be at the beginning to prevent multiple master creations
// (and leaks) in a multithread context
synchronized (masterAndZKLock) {
Exception exceptionCaught = null;
if (!closed) {
try {
return makeStubNoRetries();
} catch (IOException e) {
exceptionCaught = e;
} catch (KeeperException e) {
exceptionCaught = e;
} catch (ServiceException e) {
exceptionCaught = e;
}
throw new MasterNotRunningException(exceptionCaught);
} else {
throw new DoNotRetryIOException("Connection was closed while trying to get master");
}
}
}
stub为BlockingInterface的实现(hbase客户端操作方法的接口),绑定了 BlockingRpcChannel,并通过 BlockingRpcChannel进行Rpc调用。
// ConnectionManager.java
private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
ZooKeeperKeepAliveConnection zkw;
try {
zkw = getKeepAliveZooKeeperWatcher();
} catch (IOException e) {
ExceptionUtil.rethrowIfInterrupt(e);
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
}
try {
checkIfBaseNodeAvailable(zkw);
// 通过zookeeper获取master的地址
ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
if (sn == null) {
String msg = "ZooKeeper available but no active master location found";
LOG.info(msg);
throw new MasterNotRunningException(msg);
}
if (isDeadServer(sn)) {
throw new MasterNotRunningException(sn + " is dead.");
}
// Use the security info interface name as our stub key
String key = getStubKey(getServiceName(),
sn.getHostname(), sn.getPort(), hostnamesCanChange);
connectionLock.putIfAbsent(key, key);
Object stub = null;
synchronized (connectionLock.get(key)) {
stub = stubs.get(key);
if (stub == null) {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
stub = makeStub(channel);
isMasterRunning();
stubs.put(key, stub);
}
}
return stub;
} finally {
zkw.close();
}
}
BlockingRpcChannel调用AbstractRpcClient.call,此方法为模板方法,分别为 RpcClientImpl和AsyncRpcClient所实现。
RpcClientImpl的内部类Connection与hbase建立socket连接,进行具体的通信。
// RpcClientImpl.java
// Connection中CallSender不断的队列里取出CallFuture,进行发送
public void run() {
while (!shouldCloseConnection.get()) {
CallFuture cts = null;
try {
cts = callsToWrite.take();
} catch (InterruptedException e) {
markClosed(new InterruptedIOException());
}
if (cts == null || cts == CallFuture.DEATH_PILL) {
assert shouldCloseConnection.get();
break;
}
if (cts.call.done) {
continue;
}
if (cts.call.checkAndSetTimeout()) {
continue;
}
try {
Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("call write error for call #" + cts.call.id
+ ", message =" + e.getMessage());
}
cts.call.setException(e);
markClosed(e);
}
}
cleanup();
}
参数交互方式采用protobuf,传参值构建成的com.google.protobuf.Message直接写到socket输出流。
private void writeRequest(Call call, final int priority, Span span) throws IOException {
RequestHeader.Builder builder = RequestHeader.newBuilder();
builder.setCallId(call.id);
if (span != null) {
builder.setTraceInfo(
RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
}
builder.setMethodName(call.md.getName());
builder.setRequestParam(call.param != null);
ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
if (cellBlock != null) {
CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
cellBlockBuilder.setLength(cellBlock.limit());
builder.setCellBlockMeta(cellBlockBuilder.build());
}
// Only pass priority if there one. Let zero be same as no priority.
if (priority != 0) builder.setPriority(priority);
RequestHeader header = builder.build();
setupIOstreams();
// Now we're going to write the call. We take the lock, then check that the connection
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection.
checkIsOpen();
IOException writeException = null;
synchronized (this.outLock) {
if (Thread.interrupted()) throw new InterruptedIOException();
calls.put(call.id, call); // We put first as we don't want the connection to become idle.
checkIsOpen(); // Now we're checking that it didn't became idle in between.
try {
call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, header, call.param,
cellBlock));
} catch (IOException e) {
// We set the value inside the synchronized block, this way the next in line
// won't even try to write. Otherwise we might miss a call in the calls map?
shouldCloseConnection.set(true);
writeException = e;
interrupt();
}
}
// call close outside of the synchronized (outLock) to prevent deadlock - HBASE-14474
if (writeException != null) {
markClosed(writeException);
close();
}
// We added a call, and may be started the connection close. In both cases, we
// need to notify the reader.
doNotify();
// Now that we notified, we can rethrow the exception if any. Otherwise we're good.
if (writeException != null) throw writeException;
}
总结:
通过创建表的过程分析总结,Hbase的RPC核心在于protobuf的运用, Connection绑定了RpcClient,通过RpcClient将protobuf数据格式写到服务端,接收时也是在输入流中取 protobuf格式的数据。