1.架构设计
(1)调度
PD 是全局的中心总控节点,负责整个集群的调度管理、Region ID 生成、维护 RegionRouteTable 路由表。一个 PDServer 管理多个集群,集群之间基于 clusterId 隔离;PD Server 需要单独部署,很多场景其实并不需要自管理,RheaKV 也支持不启用 PD,不需要自管理的集群可不启用 PD,设置 PlacementDriverOptions 的 fake选项为 true 即可。PD 一般通过 Region 的心跳返回信息进行对 Region 调度,Region 处理完后,PD 则会在下一个心跳返回中收到 Region 的变更信息来更新路由及状态表。
(2)存储
Store 是集群中的一个物理存储节点,一个 Store 包含一个或多个 Region。通常一个 Node 负责一个 Store,Store 可以被看作是 Region 的容器,里面存储着多个分片数据。Store 会向 PD 主动上报 StoreHeartbeatRequest 心跳,心跳交由 PD 的 handleStoreHeartbeat 处理,里面包含该 Store 的基本信息,比如,包含多少 Region,有哪些 Region 的 Leader 在该 Store 等。
(3)数据
Region 是最小的 KV 数据单元,可理解为一个数据分区或者分片,每个 Region 都有一个左闭右开的区间 [startKey, endKey),能够根据请求流量/负载/数据量大小等指标自动分裂以及自动副本搬迁。Region 有多个副本 Replication 构建 Raft Groups 存储在不同的 Store 节点,通过 Raft 协议日志复制功能数据同步到同 Group 的全部节点。Region对应的是 Store 里某个实际的数据区间。每个 Region 会有多个副本,每个副本存储在不同的 Store,一起组成一个Raft Group。Region 中的 Leader 会向 PD 主动上报 RegionHeartbeatRequest 心跳,交由 PD 的 handleRegionHeartbeat 处理,而 PD 是通过 Region的Epoch 感知 Region 是否有变化。
为了让大家更清楚PD,Store 和 Region 三个核心组件的功能,这里放一张官方图片以便于理解:
2.初始化
- 声明了PlacementDriverOptions、StoreEngineOptions两个配置选项实体
- 定义了RheaKVStoreOptions,并将PDOptions和SEOptions装配到属性中,并初始化声明集群名、是否开启并行压缩、服务的IP:端口列表("127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183")。
- 声明一个Node节点
- 添加一个钩子函数,实现优雅停机。(作者曾经分析过钩子函数的作用,具体参照:SOFAJRaft源码阅读(叁)-ShutdownHook如何优雅的停机)
public static void main(final String[] args) {
final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
.withFake(true) // use a fake pd
.config();
final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured() //
//StoreEngine 存储引擎支持 MemoryDB 和 RocksDB 两种实现
.withStorageType(StorageType.RocksDB)
.withRocksDBOptions(RocksDBOptionsConfigured.newConfigured().withDbPath(Configs.DB_PATH).config())
.withRaftDataPath(Configs.RAFT_DATA_PATH)
.withServerAddress(new Endpoint("127.0.0.1", 8181))
.config();
final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() //
.withClusterName(Configs.CLUSTER_NAME) //
.withUseParallelCompress(true) //
.withInitialServerList(Configs.ALL_NODE_ADDRESSES)
.withStoreEngineOptions(storeOpts) //
.withPlacementDriverOptions(pdOpts) //
.config();
System.out.println(opts);
final Node node = new Node(opts);
node.start();
Runtime.getRuntime().addShutdownHook(new Thread(node::stop));
System.out.println("server1 start OK");
}
关于Node节点的实现:
里面维护了一个RheaKVStoreOptions、RheaKVStore。
public class Node {
private final RheaKVStoreOptions options;
private RheaKVStore rheaKVStore;
public Node(RheaKVStoreOptions options) {
this.options = options;
}
public void start() {
this.rheaKVStore = new DefaultRheaKVStore();
this.rheaKVStore.init(this.options);
}
public void stop() {
this.rheaKVStore.shutdown();
}
public RheaKVStore getRheaKVStore() {
return rheaKVStore;
}
}
(1)DefaultRheaKVStore的初始化
可以看到调用了DefaultRheaKVStore,还调用了他的init方法进行初始化。
- 判断是否已经启动。
- 初始化PD。PlacementDriverClient 接口主要由 AbstractPlacementDriverClient 实现,然后 FakePlacementDriverClient、RemotePlacementDriverClient 为主要功能。
- FakePlacementDriverClient 是当系统不需要 PD 的时候进行 PD 对象的模拟。
- RemotePlacementDriverClient 通过PlacementDriverOptions 进行加载,并根据基础配置刷新路由表;它承担着对路由表RegionRouteTable 的管控,例如获取Store、路由、Leader节点信息等;它还包含着 CliService,通过 CliService 外部可对复制节点进行操作运维,如 addReplica、removeReplica、transferLeader。
- 初始化存储引擎StoreEngine,目前存储引擎支持 MemoryDB 和 RocksDB 两种实现。
- 初始化DefaultRheaKVRpcService。DefaultRegionKVService 是 RegionKVService 的默认实现类,主要处理对 Region 的具体操作。
- 实现 Failover 逻辑。并设置重试两次,等待超时时间futureTimeoutMillis是5000毫秒,默认只从leader读取数据。
- 初始化kvDispatcher。
- 在 DefaultRheaKVStore 加载完所有工序之后,便可使用 get、set、scan 等操作,还包含对应同步、异步操作。
public synchronized boolean init(final RheaKVStoreOptions opts) {
//判断是否已经启动
if (this.started) {
LOG.info("[DefaultRheaKVStore] already started.");
return true;
}
DescriberManager.getInstance().addDescriber(RouteTable.getInstance());
this.opts = opts;
//根据PlacementDriverOptions初始化PD
final PlacementDriverOptions pdOpts = opts.getPlacementDriverOptions();
final String clusterName = opts.getClusterName();
Requires.requireNonNull(pdOpts, "opts.placementDriverOptions");
Requires.requireNonNull(clusterName, "opts.clusterName");
if (Strings.isBlank(pdOpts.getInitialServerList())) {
// 如果为空,则继承父级的值
pdOpts.setInitialServerList(opts.getInitialServerList());
}
//这里不启用 PD,就实例化一个FakePlacementDriverClient
if (pdOpts.isFake()) {
this.pdClient = new FakePlacementDriverClient(opts.getClusterId(), clusterName);
//启用 PD,就实例化一个RemotePlacementDriverClient
} else {
this.pdClient = new RemotePlacementDriverClient(opts.getClusterId(), clusterName);
}
//初始化FakePlacementDriverClient/RemotePlacementDriverClient
if (!this.pdClient.init(pdOpts)) {
LOG.error("Fail to init [PlacementDriverClient].");
return false;
}
// 初始化压缩策略
ZipStrategyManager.init(opts);
// 初始化存储引擎
final StoreEngineOptions stOpts = opts.getStoreEngineOptions();
if (stOpts != null) {
stOpts.setInitialServerList(opts.getInitialServerList());
this.storeEngine = new StoreEngine(this.pdClient, this.stateListenerContainer);
if (!this.storeEngine.init(stOpts)) {
LOG.error("Fail to init [StoreEngine].");
return false;
}
}
//获取当前节点的ip和端口号
final Endpoint selfEndpoint = this.storeEngine == null ? null : this.storeEngine.getSelfEndpoint();
final RpcOptions rpcOpts = opts.getRpcOptions();
Requires.requireNonNull(rpcOpts, "opts.rpcOptions");
//初始化一个RpcService,并重写getLeader方法
this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) {
@Override
public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
final Endpoint leader = getLeaderByRegionEngine(regionId);
if (leader != null) {
return leader;
}
return super.getLeader(regionId, forceRefresh, timeoutMillis);
}
};
if (!this.rheaKVRpcService.init(rpcOpts)) {
LOG.error("Fail to init [RheaKVRpcService].");
return false;
}
//获取重试次数,默认重试两次
this.failoverRetries = opts.getFailoverRetries();
//默认5000
this.futureTimeoutMillis = opts.getFutureTimeoutMillis();
//是否只从leader读取数据,默认为true
this.onlyLeaderRead = opts.isOnlyLeaderRead();
//初始化kvDispatcher, 这里默认为true
if (opts.isUseParallelKVExecutor()) {
final int numWorkers = Utils.cpus();
//乘以16
final int bufSize = numWorkers << 4;
final String name = "parallel-kv-executor";
final ThreadFactory threadFactory = Constants.THREAD_AFFINITY_ENABLED
? new AffinityNamedThreadFactory(name, true) : new NamedThreadFactory(name, true);
//初始化Dispatcher
this.kvDispatcher = new TaskDispatcher(bufSize, numWorkers, WaitStrategyType.LITE_BLOCKING_WAIT, threadFactory);
}
this.batchingOpts = opts.getBatchingOptions();
//默认是true
if (this.batchingOpts.isAllowBatching()) {
this.getBatching = new GetBatching(KeyEvent::new, "get_batching",
new GetBatchingHandler("get", false));
this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe",
new GetBatchingHandler("get_only_safe", true));
this.putBatching = new PutBatching(KVEvent::new, "put_batching",
new PutBatchingHandler("put"));
}
LOG.info("[DefaultRheaKVStore] start successfully, options: {}.", opts);
return this.started = true;
}
(2)StoreEngine初始化
一个 Store 即为一个节点,里面包含着一个或者多个 RegionEngine,一个 StoreEngine 通常通过 PlacementDriverClient 对 PD 进行调用,同时拥有 StoreEngineOptions 配置项,里面配置着存储引擎和节点相关配置。
- 判断是否已经启动。
- 加载对应的 StoreEngineOptions 配置,构建对应的 Store 配置,并且生成一致性读的线程池 readIndexExecutor、快照线程池 snapshotExecutor、RPC 的线程池 cliRpcExecutor、Raft 的 RPC 线程池 raftRpcExecutor,以及存储 RPC 线程池 kvRpcExecutor、心跳发送器 HeartbeatSender 等。
- 打开metricsReportPeriod配置可以进行性能指标监控。
- 初始化RegionEngine
- 若开启自管理集群,则初始化心跳发送器
public synchronized boolean init(final StoreEngineOptions opts) {
// 判断是否已经启动
if (this.started) {
LOG.info("[StoreEngine] already started.");
return true;
}
DescriberManager.getInstance().addDescriber(this);
this.storeOpts = Requires.requireNonNull(opts, "opts");
Endpoint serverAddress = Requires.requireNonNull(opts.getServerAddress(), "opts.serverAddress");
//获取ip和端口
final int port = serverAddress.getPort();
final String ip = serverAddress.getIp();
//如果传入的IP为空,那么就设置启动机器ip作为serverAddress的ip
if (ip == null || Utils.IP_ANY.equals(ip)) {
serverAddress = new Endpoint(NetUtil.getLocalCanonicalHostName(), port);
opts.setServerAddress(serverAddress);
}
//获取度量上报时间
final long metricsReportPeriod = opts.getMetricsReportPeriod();
// 初始化RegionEngineOptions
List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
//如果RegionEngineOptions为空,则初始化一个
if (rOptsList == null || rOptsList.isEmpty()) {
// -1 region
final RegionEngineOptions rOpts = new RegionEngineOptions();
rOpts.setRegionId(Constants.DEFAULT_REGION_ID);
rOptsList = Lists.newArrayList();
rOptsList.add(rOpts);
opts.setRegionEngineOptionsList(rOptsList);
}
//获取集群名
final String clusterName = this.pdClient.getClusterName();
//遍历rOptsList集合,为其中的RegionEngineOptions对象设置参数
for (final RegionEngineOptions rOpts : rOptsList) {
//用集群名+“-”+RegionId 拼接设置为RaftGroupId
rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));
rOpts.setServerAddress(serverAddress);
if (Strings.isBlank(rOpts.getInitialServerList())) {
// if blank, extends parent's value
rOpts.setInitialServerList(opts.getInitialServerList());
}
if (rOpts.getNodeOptions() == null) {
// copy common node options
rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts
.getCommonNodeOptions().copy());
}
//如果原本没有设置度量上报时间,那么就重置一下
if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {
// extends store opts
rOpts.setMetricsReportPeriod(metricsReportPeriod);
}
}
// 初始化Store和Store里面的region
final Store store = this.pdClient.getStoreMetadata(opts);
if (store == null || store.getRegions() == null || store.getRegions().isEmpty()) {
LOG.error("Empty store metadata: {}.", store);
return false;
}
this.storeId = store.getId();
this.partRocksDBOptions = SystemPropertyUtil.getBoolean(PART_ROCKSDB_OPTIONS_KEY, false);
// 初始化执行器
if (this.readIndexExecutor == null) {
this.readIndexExecutor = StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());
}
if (this.raftStateTrigger == null) {
this.raftStateTrigger = StoreEngineHelper.createRaftStateTrigger(opts.getLeaderStateTriggerCoreThreads());
}
if (this.snapshotExecutor == null) {
this.snapshotExecutor = StoreEngineHelper.createSnapshotExecutor(opts.getSnapshotCoreThreads(),
opts.getSnapshotMaxThreads());
}
// init rpc executors
final boolean useSharedRpcExecutor = opts.isUseSharedRpcExecutor();
// 初始化rpc远程执行器,用来执行RPCServer的Processors
if (!useSharedRpcExecutor) {
if (this.cliRpcExecutor == null) {
this.cliRpcExecutor = StoreEngineHelper.createCliRpcExecutor(opts.getCliRpcCoreThreads());
}
if (this.raftRpcExecutor == null) {
this.raftRpcExecutor = StoreEngineHelper.createRaftRpcExecutor(opts.getRaftRpcCoreThreads());
}
if (this.kvRpcExecutor == null) {
this.kvRpcExecutor = StoreEngineHelper.createKvRpcExecutor(opts.getKvRpcCoreThreads());
}
}
// 初始化指标度量
startMetricReporters(metricsReportPeriod);
// 初始化rpcServer,供其他服务调用
this.rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverAddress, this.raftRpcExecutor,
this.cliRpcExecutor);
//为server加入各种processor
StoreEngineHelper.addKvStoreRequestProcessor(this.rpcServer, this);
if (!this.rpcServer.init(null)) {
LOG.error("Fail to init [RpcServer].");
return false;
}
// init db store
// 根据不同的类型选择db
if (!initRawKVStore(opts)) {
return false;
}
if (this.rawKVStore instanceof Describer) {
DescriberManager.getInstance().addDescriber((Describer) this.rawKVStore);
}
// init all region engine
// 为每个region初始化RegionEngine
if (!initAllRegionEngine(opts, store)) {
LOG.error("Fail to init all [RegionEngine].");
return false;
}
// heartbeat sender
// 如果开启了自管理的集群,那么需要初始化心跳发送器
if (this.pdClient instanceof RemotePlacementDriverClient) {
HeartbeatOptions heartbeatOpts = opts.getHeartbeatOptions();
if (heartbeatOpts == null) {
heartbeatOpts = new HeartbeatOptions();
}
this.heartbeatSender = new HeartbeatSender(this);
if (!this.heartbeatSender.init(heartbeatOpts)) {
LOG.error("Fail to init [HeartbeatSender].");
return false;
}
}
this.startTime = System.currentTimeMillis();
LOG.info("[StoreEngine] start successfully: {}.", this);
return this.started = true;
}
(3)RegionEngineOptions初始化
- 首先对opts.getRegionEngineOptionsList()判空,若是空则初始化一个RegionEngineOptions()并添加到rOptsList中。
- 遍历新生成的每一个RegionEngineOptions,并初始化参数RaftGroupId、ServerAddress、InitialServerList、NodeOptions、MetricsReportPeriod。
注意:以下代码来自StoreEngine#init:
// init region options
List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
if (rOptsList == null || rOptsList.isEmpty()) {
// -1 region
final RegionEngineOptions rOpts = new RegionEngineOptions();
rOpts.setRegionId(Constants.DEFAULT_REGION_ID);
rOptsList = Lists.newArrayList();
rOptsList.add(rOpts);
opts.setRegionEngineOptionsList(rOptsList);
}
final String clusterName = this.pdClient.getClusterName();
for (final RegionEngineOptions rOpts : rOptsList) {
rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));
rOpts.setServerAddress(serverAddress);
if (Strings.isBlank(rOpts.getInitialServerList())) {
// if blank, extends parent's value
rOpts.setInitialServerList(opts.getInitialServerList());
}
if (rOpts.getNodeOptions() == null) {
// copy common node options
rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts
.getCommonNodeOptions().copy());
}
if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {
// extends store opts
rOpts.setMetricsReportPeriod(metricsReportPeriod);
}
}
(4)Store初始化
调用pdClient的getStoreMetadata方法进行初始化:
final Store store = this.pdClient.getStoreMetadata(opts);
当调用FakePlacementDriverClient#getStoreMetadata时:
- 获取之前初始化得到的RegionEngineOptions链表。
- 构造一个与RegionEngineOptions链表相同大小的Region链表。
- 对RegionEngineOptions链表中的每个元素执行getLocalRegionMetadata方法,并将结果添加到region链表中。
public Store getStoreMetadata(final StoreEngineOptions opts) {
final Store store = new Store();
final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
final List<Region> regionList = Lists.newArrayListWithCapacity(rOptsList.size());
store.setId(-1);
store.setEndpoint(opts.getServerAddress());
for (final RegionEngineOptions rOpts : rOptsList) {
regionList.add(getLocalRegionMetadata(rOpts));
}
store.setRegions(regionList);
return store;
}
我们来看AbstractPlacementDriverClient#getLocalRegionMetadata方法:
- 保证regionId在合理的范围内。
- 初始化Region。
- 根据initialServerList转换成peer对象。
- 将Region添加到regionRouteTable路由表中。
protected Region getLocalRegionMetadata(final RegionEngineOptions opts) {
final long regionId = Requires.requireNonNull(opts.getRegionId(), "opts.regionId");
Requires.requireTrue(regionId >= Region.MIN_ID_WITH_MANUAL_CONF, "opts.regionId must >= "
+ Region.MIN_ID_WITH_MANUAL_CONF);
Requires.requireTrue(regionId < Region.MAX_ID_WITH_MANUAL_CONF, "opts.regionId must < "
+ Region.MAX_ID_WITH_MANUAL_CONF);
final byte[] startKey = opts.getStartKeyBytes();
final byte[] endKey = opts.getEndKeyBytes();
final String initialServerList = opts.getInitialServerList();
final Region region = new Region();
final Configuration conf = new Configuration();
// region
region.setId(regionId);
region.setStartKey(startKey);
region.setEndKey(endKey);
region.setRegionEpoch(new RegionEpoch(-1, -1));
// peers
Requires.requireTrue(Strings.isNotBlank(initialServerList), "opts.initialServerList is blank");
conf.parse(initialServerList);
region.setPeers(JRaftHelper.toPeerList(conf.listPeers()));
this.regionRouteTable.addOrUpdateRegion(region);
return region;
}
RegionEpoch涉及到两个版本号:
(1)confVer:Conf 变化的版本号, 当增加或者移除一个peer时,版本号自增
(2)version:Region 版本号, 分裂或合并时,版本号自增
public RegionEpoch(long confVer, long version) {
this.confVer = confVer
this.version = version;
}
关于RegionRouteTable#addOrUpdateRegion:
public void addOrUpdateRegion(final Region region) {
Requires.requireNonNull(region, "region");
Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
final long regionId = region.getId();
final byte[] startKey = BytesUtil.nullToEmpty(region.getStartKey());
final StampedLock stampedLock = this.stampedLock;
final long stamp = stampedLock.writeLock();
try {
this.regionTable.put(regionId, region.copy());
this.rangeTable.put(startKey, regionId);
} finally {
stampedLock.unlockWrite(stamp);
}
}
我们看看RegionRouteTable的几个字段:
- keyBytesComparator:是一个LexicographicByteArrayComparator字典序比较器
- stampedLock:比读写锁性能更高的锁
- rangeTable:是一个TreeMap,它实现了NavigableMap,并按照指定的keyBytesComparator排序,键值对为<regionId, region>
- regionTable:是一个hashMap,键值对为<startKey, regionId>
private static final Comparator<byte[]> keyBytesComparator = BytesUtil.getDefaultByteArrayComparator();
private final StampedLock stampedLock = new StampedLock();
private final NavigableMap<byte[], Long> rangeTable = new TreeMap<>(keyBytesComparator);
private final Map<Long, Region> regionTable = Maps.newHashMap();
(5)RegionEngine初始化
StoreEngine#initAllRegionEngine
private boolean initAllRegionEngine(final StoreEngineOptions opts, final Store store) {
Requires.requireNonNull(opts, "opts");
Requires.requireNonNull(store, "store");
//获取主目录
String baseRaftDataPath = opts.getRaftDataPath();
if (Strings.isNotBlank(baseRaftDataPath)) {
try {
FileUtils.forceMkdir(new File(baseRaftDataPath));
} catch (final Throwable t) {
LOG.error("Fail to make dir for raftDataPath: {}.", baseRaftDataPath);
return false;
}
} else {
baseRaftDataPath = "";
}
final Endpoint serverAddress = opts.getServerAddress();
//获取RegionEngineOptions和region
final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
final List<Region> regionList = store.getRegions();
Requires.requireTrue(rOptsList.size() == regionList.size());
for (int i = 0; i < rOptsList.size(); i++) {
final RegionEngineOptions rOpts = rOptsList.get(i);
if (!inConfiguration(rOpts.getServerAddress().toString(), rOpts.getInitialServerList())) {
continue;
}
final Region region = regionList.get(i);
//检验region路径是否为空,为空则重新设值
if (Strings.isBlank(rOpts.getRaftDataPath())) {
final String childPath = "raft_data_region_" + region.getId() + "_" + serverAddress.getPort();
rOpts.setRaftDataPath(Paths.get(baseRaftDataPath, childPath).toString());
}
Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
//根据Region初始化RegionEngine
final RegionEngine engine = new RegionEngine(region, this);
if (engine.init(rOpts)) {
// 每个 RegionKVService 对应一个 Region,只处理本身 Region 范畴内的请求
final RegionKVService regionKVService = new DefaultRegionKVService(engine);
registerRegionKVService(regionKVService);
//放入到ConcurrentMap<Long, RegionEngine> regionKVServiceTable 中
this.regionEngineTable.put(region.getId(), engine);
} else {
LOG.error("Fail to init [RegionEngine: {}].", region);
return false;
}
}
return true;
}
RegionEngine#init
RegionEngine 是一个执行单元。它里面记录着关联着的 StoreEngine 信息以及对应的 Region 信息。由于它也是一个选举节点,所以也包含着对应状态机 KVStoreStateMachine,以及对应的 RaftGroupService,并启动里面的 RpcServer 进行选举同步。
public synchronized boolean init(final RegionEngineOptions opts) {
if (this.started) {
LOG.info("[RegionEngine: {}] already started.", this.region);
return true;
}
this.regionOpts = Requires.requireNonNull(opts, "opts");
//实例化状态机
this.fsm = new KVStoreStateMachine(this.region, this.storeEngine);
// node options
NodeOptions nodeOpts = opts.getNodeOptions();
if (nodeOpts == null) {
nodeOpts = new NodeOptions();
}
//如果度量间隔时间大于零,那么开启度量
final long metricsReportPeriod = opts.getMetricsReportPeriod();
if (metricsReportPeriod > 0) {
// metricsReportPeriod > 0 means enable metrics
nodeOpts.setEnableMetrics(true);
}
final Configuration initialConf = new Configuration();
if (!initialConf.parse(opts.getInitialServerList())) {
LOG.error("Fail to parse initial configuration {}.", opts.getInitialServerList());
return false;
}
//初始化集群配置
nodeOpts.setInitialConf(initialConf);
nodeOpts.setFsm(this.fsm);
//初始化各种日志的路径
final String raftDataPath = opts.getRaftDataPath();
try {
FileUtils.forceMkdir(new File(raftDataPath));
} catch (final Throwable t) {
LOG.error("Fail to make dir for raftDataPath {}.", raftDataPath);
return false;
}
if (Strings.isBlank(nodeOpts.getLogUri())) {
final Path logUri = Paths.get(raftDataPath, "log");
nodeOpts.setLogUri(logUri.toString());
}
if (Strings.isBlank(nodeOpts.getRaftMetaUri())) {
final Path meteUri = Paths.get(raftDataPath, "meta");
nodeOpts.setRaftMetaUri(meteUri.toString());
}
if (Strings.isBlank(nodeOpts.getSnapshotUri())) {
final Path snapshotUri = Paths.get(raftDataPath, "snapshot");
nodeOpts.setSnapshotUri(snapshotUri.toString());
}
LOG.info("[RegionEngine: {}], log uri: {}, raft meta uri: {}, snapshot uri: {}.", this.region,
nodeOpts.getLogUri(), nodeOpts.getRaftMetaUri(), nodeOpts.getSnapshotUri());
final Endpoint serverAddress = opts.getServerAddress();
final PeerId serverId = new PeerId(serverAddress, 0);
final RpcServer rpcServer = this.storeEngine.getRpcServer();
//初始化RaftGroupService
this.raftGroupService = new RaftGroupService(opts.getRaftGroupId(), serverId, nodeOpts, rpcServer, true);
this.node = this.raftGroupService.start(false);
//初始化node节点
RouteTable.getInstance().updateConfiguration(this.raftGroupService.getGroupId(), nodeOpts.getInitialConf());
if (this.node != null) {
final RawKVStore rawKVStore = this.storeEngine.getRawKVStore();
final Executor readIndexExecutor = this.storeEngine.getReadIndexExecutor();
//RaftRawKVStore 是 RheaKV 基于 Raft 复制状态机 KVStoreStateMachine 的 RawKVStore 接口 KV 存储实现
//RheaKV 的 Raft 入口,从这里开始 Raft 流程
this.raftRawKVStore = new RaftRawKVStore(this.node, rawKVStore, readIndexExecutor);
//拦截请求做指标度量
this.metricsRawKVStore = new MetricsRawKVStore(this.region.getId(), this.raftRawKVStore);
// metrics config
if (this.regionMetricsReporter == null && metricsReportPeriod > 0) {
final MetricRegistry metricRegistry = this.node.getNodeMetrics().getMetricRegistry();
if (metricRegistry != null) {
final ScheduledExecutorService scheduler = this.storeEngine.getMetricsScheduler();
// start raft node metrics reporter
this.regionMetricsReporter = Slf4jReporter.forRegistry(metricRegistry) //
.prefixedWith("region_" + this.region.getId()) //
.withLoggingLevel(Slf4jReporter.LoggingLevel.INFO) //
.outputTo(LOG) //
.scheduleOn(scheduler) //
.shutdownExecutorOnStop(scheduler != null) //
.build();
this.regionMetricsReporter.start(metricsReportPeriod, TimeUnit.SECONDS);
}
}
this.started = true;
LOG.info("[RegionEngine] start successfully: {}.", this);
}
return this.started;
}
3.剖析MULTI-RAFT-GROUP
1.容量的可拓展性
Leader单点的流量瓶颈痛点:
由Raft算法我们知道,我们进行更新操作时必须首先请求Leader,再同步给Follower。而在实际运用中,一组 Raft 的 Leader往往存在单点的流量瓶颈,流量高便无法承载,同时每个节点都是全量数据,所以会受到节点的存储限制而导致容量瓶颈,无法扩展。
横向切分:
MULTI-RAFT-GROUP把整个数据从横向做切分,分为多个 Region 来解决磁盘瓶颈,然后每个 Region 都对应有独立的 Leader 和一个或多个 Follower 的 Raft 组进行横向扩展,此时系统便有多个写入的节点,从而分担写入压力。每个 Region 会有多个副本,每个副本存储在不同的 Store,一起组成一个Raft Group。
2.MULTI-RAFT-GROUP模式下的管理
Region分区逻辑
Range 切分是按照对 Key 进行字节排序后再做每段每段切分,像类似 scan 等操作对相近 key 的查询会尽可能集中在某个 Region,这个是 Hash 无法支持的,就算遇到单个 Region 的拆分也会更好处理一些,只用修改部分元数据,不会涉及到大范围的数据挪动。
Region的拆分
接下来进行代码层面的分析:
KVStoreStateMachine#doSplit
private void doSplit(final KVStateOutputList kvStates) {
final byte[] parentKey = this.region.getStartKey();
for (final KVState kvState : kvStates) {
//获取KVOption
final KVOperation op = kvState.getOp();
final long currentRegionId = op.getCurrentRegionId();
final long newRegionId = op.getNewRegionId();
final byte[] splitKey = op.getKey();
final KVStoreClosure closure = kvState.getDone();
try {
this.rawKVStore.initFencingToken(parentKey, splitKey);
this.storeEngine.doSplit(currentRegionId, newRegionId, splitKey);
if (closure != null) {
// null on follower
closure.setData(Boolean.TRUE);
closure.run(Status.OK());
}
} catch (final Throwable t) {
LOG.error("Fail to split, regionId={}, newRegionId={}, splitKey={}.", currentRegionId, newRegionId,
BytesUtil.toHex(splitKey));
setCriticalError(closure, t);
}
}
}
Store#doSplit
public void doSplit(final Long regionId, final Long newRegionId, final byte[] splitKey) {
try {
Requires.requireNonNull(regionId, "regionId");
Requires.requireNonNull(newRegionId, "newRegionId");
final RegionEngine parent = getRegionEngine(regionId);
// Region复制
final Region region = parent.getRegion().copy();
final RegionEngineOptions rOpts = parent.copyRegionOpts();
region.setId(newRegionId);
region.setStartKey(splitKey);
region.setRegionEpoch(new RegionEpoch(-1, -1));
// 更新Region配置属性
rOpts.setRegionId(newRegionId);
rOpts.setStartKeyBytes(region.getStartKey());
rOpts.setEndKeyBytes(region.getEndKey());
rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(this.pdClient.getClusterName(), newRegionId));
rOpts.setRaftDataPath(null);
// 设置RaftData存储路径
String baseRaftDataPath = this.storeOpts.getRaftDataPath();
if (Strings.isBlank(baseRaftDataPath)) {
baseRaftDataPath = "";
}
rOpts.setRaftDataPath(baseRaftDataPath + "raft_data_region_" + region.getId() + "_"
+ getSelfEndpoint().getPort());
// 构建新Region的RegionEngine
final RegionEngine engine = new RegionEngine(region, this);
// 初始化RegionEngine
if (!engine.init(rOpts)) {
LOG.error("Fail to init [RegionEngine: {}].", region);
throw Errors.REGION_ENGINE_FAIL.exception();
}
// 更新旧Region的版本号和EndKey
final Region pRegion = parent.getRegion();
final RegionEpoch pEpoch = pRegion.getRegionEpoch();
final long version = pEpoch.getVersion();
pEpoch.setVersion(version + 1); // version + 1
pRegion.setEndKey(splitKey); // update endKey
//以下两行代码可以使读取“pRegion”的关系为“happens-before”
//因为对ConcurrentMap的写入发生在该ConcurrentMap每次后续读取之前
this.regionEngineTable.put(region.getId(), engine);
registerRegionKVService(new DefaultRegionKVService(engine));
// 更新本地路由表regionRouteTable
this.pdClient.getRegionRouteTable().splitRegion(pRegion.getId(), region);
} finally {
this.splitting.set(false);
}
}
RegionRouteTable#splitRegion:
public void splitRegion(final long leftId, final Region right) {
Requires.requireNonNull(right, "right");
Requires.requireNonNull(right.getRegionEpoch(), "right.regionEpoch");
final StampedLock stampedLock = this.stampedLock;
final long stamp = stampedLock.writeLock();
try {
final Region left = this.regionTable.get(leftId);
Requires.requireNonNull(left, "left");
final byte[] leftStartKey = BytesUtil.nullToEmpty(left.getStartKey());
final byte[] leftEndKey = left.getEndKey();
final long rightId = right.getId();
final byte[] rightStartKey = right.getStartKey();
final byte[] rightEndKey = right.getEndKey();
Requires.requireNonNull(rightStartKey, "rightStartKey");
Requires.requireTrue(BytesUtil.compare(leftStartKey, rightStartKey) < 0,
"leftStartKey must < rightStartKey");
if (leftEndKey == null || rightEndKey == null) {
Requires.requireTrue(leftEndKey == rightEndKey, "leftEndKey must == rightEndKey");
} else {
Requires.requireTrue(BytesUtil.compare(leftEndKey, rightEndKey) == 0, "leftEndKey must == rightEndKey");
Requires.requireTrue(BytesUtil.compare(rightStartKey, rightEndKey) < 0,
"rightStartKey must < rightEndKey");
}
final RegionEpoch leftEpoch = left.getRegionEpoch();
leftEpoch.setVersion(leftEpoch.getVersion() + 1);
left.setEndKey(rightStartKey);
this.regionTable.put(rightId, right.copy());
this.rangeTable.put(rightStartKey, rightId);
} finally {
stampedLock.unlockWrite(stamp);
}
}
Region的合并
我们知道,数据过多需要进行拆分,那 Region 进行合并就是 2 个或者多个连续的 Region 数据量明显小于绝大多数 Region 容量则我们可以对其进行合并。这一块目前还没有代码实现。