一、所有的hbase的客户端操作都是通过 Connection来进行的,从创建 Connection入口进行分析。

ConnectionFactory.createConnection(HBaseConfiguration.create())

HConnectionImplementation为Connection的实现类。

// ConnectionFactory.java

static Connection createConnection(final Configuration conf, final boolean managed,

    final ExecutorService pool, final User user)
throws IOException {
  String className = conf.get(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
    ConnectionManager.HConnectionImplementation.class.getName());
  Class<?> clazz = null;
  try {
    clazz = Class.forName(className);
  } catch (ClassNotFoundException e) {
    throw new IOException(e);
  }
  try {
    // Default HCM#HCI is not accessible; make it so before invoking.
    Constructor<?> constructor =
      clazz.getDeclaredConstructor(Configuration.class,
        boolean.class, ExecutorService.class, User.class);
    constructor.setAccessible(true);
    return (Connection) constructor.newInstance(conf, managed, pool, user);
  } catch (Exception e) {
    throw new IOException(e);
  }
}

HConnectionImplementation所调用的构造方法。通过 RpcClientFactory.createClient()创建了RpcClient。

//ConnectionManager.java

HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool, User user) throws IOException {

    this(conf);
    this.user = user;
    this.batchPool = pool;
    this.managed = managed;
    this.registry = this.setupRegistry();

    this.retrieveClusterId();

    //RpcClient绑定到了 Connection上

    this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
    boolean shouldListen = conf.getBoolean("hbase.status.published", false);
    Class listenerClass = conf.getClass("hbase.status.listener.class", ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, Listener.class);
    if(shouldListen) {
        if(listenerClass == null) {
            LOG.warn("hbase.status.published is true, but hbase.status.listener.class is not set - not listening status");
        } else {
            this.clusterStatusListener = new ClusterStatusListener(new DeadServerHandler() {
                public void newDead(ServerName sn) {
                    HConnectionImplementation.this.clearCaches(sn);
                    HConnectionImplementation.this.rpcClient.cancelConnections(sn);
                }
            }, conf, listenerClass);
        }

    }

}

RpcClient的实现类为RpcClientImpl(还有一个异步的RpcClient->AsyncRpcClient)

//RpcClientFactory.java

public static RpcClient createClient(Configuration conf, String clusterId,

    SocketAddress localAddr, MetricsConnection metrics) {
  String rpcClientClass =
      conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
        RpcClientImpl.class.getName());
  return ReflectionUtils.instantiateWithCustomCtor(
      rpcClientClass,
      new Class[] { Configuration.class, String.class, SocketAddress.class,
          MetricsConnection.class },
      new Object[] { conf, clusterId, localAddr, metrics }
  );
}

二、接下来从具体的操作的分析怎么使用 RpcClient,列子为创建表的方法

//Admin.java

void createTable(HTableDescriptor desc) throws IOException;

上面是org.apache.hadoop.hbase.client.Admin创建表的方法

//HBaseAdmin.java

private Future<Void> createTableAsyncV2(final HTableDescriptor desc, final byte[][] splitKeys)

    throws IOException {
  if (desc.getTableName() == null) {
    throw new IllegalArgumentException("TableName cannot be null");
  }
  if (splitKeys != null && splitKeys.length > 0) {
    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
    // Verify there are no duplicate split keys
    byte[] lastKey = null;
    for (byte[] splitKey : splitKeys) {
      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
        throw new IllegalArgumentException(
            "Empty split key must not be passed in the split keys.");
      }
      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
        throw new IllegalArgumentException("All split keys must be unique, " +
          "found duplicate: " + Bytes.toStringBinary(splitKey) +
          ", " + Bytes.toStringBinary(lastKey));
      }
      lastKey = splitKey;
    }

  }

  CreateTableResponse response = executeCallable(
    new MasterCallable<CreateTableResponse>(getConnection()) {
      @Override
      public CreateTableResponse call(int callTimeout) throws ServiceException {
        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
        controller.setCallTimeout(callTimeout);
        controller.setPriority(desc.getTableName());
        CreateTableRequest request = RequestConverter.buildCreateTableRequest(

          desc, splitKeys, ng.getNonceGroup(), ng.newNonce());

        //此处的master为 MasterCallable的connection调用getKeepAliveMasterService绑定的stub

        return master.createTable(controller, request);
      }
    });
  return new CreateTableFuture(this, desc, splitKeys, response);

}

上面的具体的创建表的实现,此方法是异步的。

通过ConnectionManager创建master的stub, HConnectionImplementation为 ConnectionManager的内部类。

// ConnectionManager.java

Object makeStub() throws IOException {

  // The lock must be at the beginning to prevent multiple master creations
  //  (and leaks) in a multithread context
  synchronized (masterAndZKLock) {
    Exception exceptionCaught = null;
    if (!closed) {
      try {
        return makeStubNoRetries();
      } catch (IOException e) {
        exceptionCaught = e;
      } catch (KeeperException e) {
        exceptionCaught = e;
      } catch (ServiceException e) {
        exceptionCaught = e;

      }

      throw new MasterNotRunningException(exceptionCaught);
    } else {
      throw new DoNotRetryIOException("Connection was closed while trying to get master");
    }
  }
}

stub为BlockingInterface的实现(hbase客户端操作方法的接口),绑定了 BlockingRpcChannel,并通过 BlockingRpcChannel进行Rpc调用。

// ConnectionManager.java

private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {

  ZooKeeperKeepAliveConnection zkw;
  try {
    zkw = getKeepAliveZooKeeperWatcher();
  } catch (IOException e) {
    ExceptionUtil.rethrowIfInterrupt(e);
    throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
  }
  try {

    checkIfBaseNodeAvailable(zkw);

    // 通过zookeeper获取master的地址

    ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
    if (sn == null) {
      String msg = "ZooKeeper available but no active master location found";
      LOG.info(msg);
      throw new MasterNotRunningException(msg);
    }
    if (isDeadServer(sn)) {
      throw new MasterNotRunningException(sn + " is dead.");
    }
    // Use the security info interface name as our stub key
    String key = getStubKey(getServiceName(),
        sn.getHostname(), sn.getPort(), hostnamesCanChange);
    connectionLock.putIfAbsent(key, key);
    Object stub = null;
    synchronized (connectionLock.get(key)) {
      stub = stubs.get(key);
      if (stub == null) {
        BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
        stub = makeStub(channel);
        isMasterRunning();
        stubs.put(key, stub);
      }
    }
    return stub;
  } finally {
    zkw.close();
  }

}

BlockingRpcChannel调用AbstractRpcClient.call,此方法为模板方法,分别为 RpcClientImpl和AsyncRpcClient所实现。

RpcClientImpl的内部类Connection与hbase建立socket连接,进行具体的通信。

// RpcClientImpl.java

// Connection中CallSender不断的队列里取出CallFuture,进行发送

public void run() {

  while (!shouldCloseConnection.get()) {
    CallFuture cts = null;
    try {
      cts = callsToWrite.take();
    } catch (InterruptedException e) {
      markClosed(new InterruptedIOException());

    }

    if (cts == null || cts == CallFuture.DEATH_PILL) {
      assert shouldCloseConnection.get();
      break;

    }

    if (cts.call.done) {
      continue;

    }

    if (cts.call.checkAndSetTimeout()) {
      continue;

    }

    try {
      Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
    } catch (IOException e) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("call write error for call #" + cts.call.id
          + ", message =" + e.getMessage());
      }
      cts.call.setException(e);
      markClosed(e);
    }

  }

  cleanup();
}

参数交互方式采用protobuf,传参值构建成的com.google.protobuf.Message直接写到socket输出流。

private void writeRequest(Call call, final int priority, Span span) throws IOException {
  RequestHeader.Builder builder = RequestHeader.newBuilder();
  builder.setCallId(call.id);
  if (span != null) {
    builder.setTraceInfo(
        RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
  }
  builder.setMethodName(call.md.getName());
  builder.setRequestParam(call.param != null);
  ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
  if (cellBlock != null) {
    CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
    cellBlockBuilder.setLength(cellBlock.limit());
    builder.setCellBlockMeta(cellBlockBuilder.build());
  }
  // Only pass priority if there one.  Let zero be same as no priority.
  if (priority != 0) builder.setPriority(priority);
  RequestHeader header = builder.build();

  setupIOstreams();

  // Now we're going to write the call. We take the lock, then check that the connection
  //  is still valid, and, if so we do the write to the socket. If the write fails, we don't
  //  know where we stand, we have to close the connection.
  checkIsOpen();
  IOException writeException = null;
  synchronized (this.outLock) {
    if (Thread.interrupted()) throw new InterruptedIOException();

    calls.put(call.id, call); // We put first as we don't want the connection to become idle.
    checkIsOpen(); // Now we're checking that it didn't became idle in between.

    try {
      call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, header, call.param,
          cellBlock));
    } catch (IOException e) {
      // We set the value inside the synchronized block, this way the next in line
      //  won't even try to write. Otherwise we might miss a call in the calls map?
      shouldCloseConnection.set(true);
      writeException = e;
      interrupt();
    }
  }

  // call close outside of the synchronized (outLock) to prevent deadlock - HBASE-14474
  if (writeException != null) {
    markClosed(writeException);
    close();
  }

  // We added a call, and may be started the connection close. In both cases, we
  //  need to notify the reader.
  doNotify();

  // Now that we notified, we can rethrow the exception if any. Otherwise we're good.
  if (writeException != null) throw writeException;
}

总结:

通过创建表的过程分析总结,Hbase的RPC核心在于protobuf的运用, Connection绑定了RpcClient,通过RpcClient将protobuf数据格式写到服务端,接收时也是在输入流中取 protobuf格式的数据。

03-27 18:23