文章目录
在Flink中,默认的StateBackend实现为MemoryStateBackend,本文以MemoryStateBackend为例说明StateBackend的设计与实现。
本文介绍MemoryStateBackend中如下三个主要组件的创建过程:
FsStateBackend和RocksDBStateBackend这两种状态后端存储的实现,功能和MemoryStateBackend类似,区别在于内部创建的KeyedStateBackend和CheckpointStorage。
1.基于MemoryStateBackend创建KeyedStateBackend
1.1. 状态初始化
AbstractStreamOperator.keyedStatedBackend()方法定义了创建和初始化KeyedStatedBackend的逻辑,具体如下。
protected <K> AbstractKeyedStateBackend<K> keyedStateBackend(
TypeSerializer<K> keySerializer,
String operatorIdentifierText,
PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,
CloseableRegistry backendCloseableRegistry,
MetricGroup metricGroup) throws Exception {
if (keySerializer == null) {
return null;
}
String logDescription = "keyed state backend for " + operatorIdentifierText;
//1.
TaskInfo taskInfo = environment.getTaskInfo();
final KeyGroupRange keyGroupRange =
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
taskInfo.getMaxNumberOfParallelSubtasks(),
taskInfo.getNumberOfParallelSubtasks(),
taskInfo.getIndexOfThisSubtask());
// 确保恢复状态过程中构建的数据流被关闭
CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();
backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);
// 创建BackendRestorerProcedure
BackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle>
backendRestorer =
new BackendRestorerProcedure<>(
(stateHandles) -> stateBackend.createKeyedStateBackend(
environment,
environment.getJobID(),
operatorIdentifierText,
keySerializer,
taskInfo.getMaxNumberOfParallelSubtasks(),
keyGroupRange,
environment.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
metricGroup,
stateHandles,
cancelStreamRegistryForRestore),
backendCloseableRegistry,
logDescription);
try {
return backendRestorer.createAndRestore(
prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
} finally {
if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryFor
Restore)) {
IOUtils.closeQuietly(cancelStreamRegistryForRestore);
}
}
}
1.2. 创建状态
接下来我们看MemoryStateBackend.createKeyedStateBackend()方法的具体实现。
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
// 获取TaskStateManager实例
TaskStateManager taskStateManager = env.getTaskStateManager();
// 创建HeapPriorityQueueSetFactory实例
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
// 创建HeapKeyedStateBackendBuilder实例HeapKeyedStateBackend
return new HeapKeyedStateBackendBuilder<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
ttlTimeProvider,
stateHandles,
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
taskStateManager.createLocalRecoveryConfig(),
priorityQueueSetFactory,
isUsingAsynchronousSnapshots(),
cancelStreamRegistry).build();
}
2. 基于MemoryStateBackend创建OperatorStateBackend
和创建KeyedStateBackend的过程相似,AbstractStreamOperator.operatorStateBackend()方法实现了创建OperatorStateBackend的方法。
protected OperatorStateBackend operatorStateBackend(
String operatorIdentifierText,
PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,
CloseableRegistry backendCloseableRegistry) throws Exception {
String logDescription = "operator state backend for " + operatorIdentifierText;
CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();
backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);
BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle>
backendRestorer =
new BackendRestorerProcedure<>(
(stateHandles) -> stateBackend.createOperatorStateBackend(
environment,
operatorIdentifierText,
stateHandles,
cancelStreamRegistryForRestore),
backendCloseableRegistry,
logDescription);
try {
return backendRestorer.createAndRestore(
prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());
} finally {
if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryFor
Restore)) {
IOUtils.closeQuietly(cancelStreamRegistryForRestore);
}
}
}
其中prioritizedOperatorSubtaskStates是从TaskStateManager中根据OperatorID获取的算子专有历史状态,可以通过prioritizedOperatorSubtaskStates获取当前算子中的PrioritizedManagedOperatorState,并基于这些状态数据恢复OperatorStateBackend中算子的状态。
3.基于MemoryStateBackend创建CheckpointStorage
在createCheckpointStorage()方法中,直接创建MemoryBackendCheckpointStorage实例并返回,没有涉及太多的流程
public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(),
getSavepointPath(), maxStateSize);
}
参考:《Flink设计与实现:核心原理与源码解析》–张利兵