上文介绍了CheckpointBarrier的对齐操作,当CheckpointBarrier完成对齐操作后,接下来就是通过notifyCheckpoint()方法触发StreamTask节点的Checkpoint操作。
一. 调用StreamTask执行Checkpoint操作
如下代码,notifyCheckpoint()方法主要包含如下逻辑。
> 1. 判断toNotifyOnCheckpoint不为空。
> 2. 创建CheckpointMetaData和CheckpointMetrics实例,CheckpointMetaData用于存储
> Checkpoint的元信息,CheckpointMetrics用于记录和监控Checkpoint监控指标。
> 3. 触发StreamTask中算子的Checkpoint操作。
protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier,
long bufferedBytes,
long alignmentDurationNanos) throws Exception {
if (toNotifyOnCheckpoint != null) {
// 创建CheckpointMetaData对象用于存储Meta信息
CheckpointMetaData checkpointMetaData =
new CheckpointMetaData(checkpointBarrier.getId(),
checkpointBarrier.getTimestamp());
// 创建CheckpointMetrics对象用于记录监控指标
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
.setBytesBufferedInAlignment(bufferedBytes)
.setAlignmentDurationNanos(alignmentDurationNanos);
// 调用toNotifyOnCheckpoint.triggerCheckpointOnBarrier()方法触发Checkpoint
操作
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
checkpointMetaData,
checkpointBarrier.getCheckpointOptions(),
checkpointMetrics);
}
}
注意:StreamTask是唯一实现了Checkpoint方法的子类,即只有StreamTask才能触发当前Task实例中的Checkpoint操作。
接下来具体看Checkpoint执行细节
1. 执行Checkpoint总体代码流程
不管是哪种方式触发Checkpoint,最终都是调用StreamTask.performCheckpoint()方法实现StreamTask实例中状态数据的持久化操作。
在StreamTask.performCheckpoint()方法中,首先判断当前的Task是否运行正常,然后使用actionExecutor线程池执行Checkpoint操作,Checkpoint的实际执行过程如下。
- task挂掉情况处理:
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics,
boolean advanceToEndOfTime) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(),
checkpointOptions.getCheckpointType(),
getName());
final long checkpointId = checkpointMetaData.getCheckpointId();
if (isRunning) {
// 使用actionExecutor执行Checkpoint逻辑
actionExecutor.runThrowing(() -> {
if (checkpointOptions.getCheckpointType().isSynchronous()) {
setSynchronousSavepointId(checkpointId);
if (advanceToEndOfTime) {
advanceToEndOfEventTime();
}
}
//Checkpoint操作的准备工作
operatorChain.prepareSnapshotPreBarrier(checkpointId);
//将checkpoint barrier发送到下游的stream中
operatorChain.broadcastCheckpointBarrier(
checkpointId,
checkpointMetaData.getTimestamp(),
checkpointOptions);
//对算子中的状态进行快照操作,此步骤是异步操作,
//不影响streaming拓扑中数据的正常处理
checkpointState(checkpointMetaData, checkpointOptions,
checkpointMetrics);
});
return true;
} else {
// 如果Task处于其他状态,则向下游广播CancelCheckpointMarker消息
actionExecutor.runThrowing(() -> {
final CancelCheckpointMarker message =
new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
recordWriter.broadcastEvent(message);
});
return false;
}
}
1.1. StreamTask.checkpointState()
接下来我们看StreamTask.checkpointState()方法的具体实现,如下代码。
private void checkpointState(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
// 创建CheckpointStreamFactory实例
CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorag
eLocation(
checkpointMetaData.getCheckpointId(),
checkpointOptions.getTargetLocation());
// 创建CheckpointingOperation实例
CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
this,
checkpointMetaData,
checkpointOptions,
storage,
checkpointMetrics);
// 执行Checkpoint操作
checkpointingOperation.executeCheckpointing();
}
1.2. executeCheckpointing
如代码所示,CheckpointingOperation.executeCheckpointing()方法主要包含如下逻辑。
public void executeCheckpointing() throws Exception {
//通过算子创建执行快照操作的OperatorSnapshotFutures对象
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
}
// 此处省略部分代码
startAsyncPartNano = System.nanoTime();
checkpointMetrics.setSyncDurationMillis(
(startAsyncPartNano - startSyncPartNano) / 1_000_000);
AsyncCheckpointRunnable asyncCheckpointRunnable = new
AsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress,
checkpointMetaData,
checkpointMetrics,
startAsyncPartNano);
// 注册Closeable操作
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
// 执行asyncCheckpointRunnable
owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
}
1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中
如下代码,AbstractStreamOperator.snapshotState()方法将当前算子的状态快照操作封装在OperatorSnapshotFutures对象中,然后通过asyncOperationsThreadPool线程池异步触发所有的OperatorSnapshotFutures操作,方法主要步骤如下。
- 向snapshotInProgress中指定KeyedStateRawFuture和OperatorStateRawFuture,专门用于处理原生状态数据的快照操作。
- 返回创建的snapshotInProgress异步Future对象,snapshotInProgress中封装了当前算子需要执行的所有快照操作。
public final OperatorSnapshotFutures snapshotState(long checkpointId,
long timestamp,
CheckpointOptions
checkpointOptions,
CheckpointStreamFactory factory
) throws Exception {
// 获取KeyGroupRange
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_
RANGE;
// 创建OperatorSnapshotFutures处理对象
OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
// 创建snapshotContext上下文对象
StateSnapshotContextSynchronousImpl snapshotContext =
new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,
getContainingTask().getCancelables());
try {
snapshotState(snapshotContext);
// 设定KeyedStateRawFuture和OperatorStateRawFuture
snapshotInProgress
.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress
.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
// 如果operatorStateBackend不为空,设定OperatorStateManagedFuture
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend
.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
// 如果keyedStateBackend不为空,设定KeyedStateManagedFuture
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend
.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
// 此处省略部分代码
}
return snapshotInProgress;
}
这里可以看出,原生状态和管理状态
的RunnableFuture对象会有所不同
1.4. 算子状态进行快照
我们知道所有的状态快照操作都会被封装到OperatorStateManagedFuture对象中,最终通过AsyncCheckpointRunnable线程触发执行。
下面我们看AsyncCheckpointRunnable线程的定义。如代码所示,AsyncCheckpointRunnable.run()方法主要逻辑如下。
- 执行所有状态快照线程操作
- 从finalizedSnapshots中获取JobManagerOwnedState和TaskLocalState,分别存储在jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates集合中。
- 调用checkpointMetrics对象记录Checkpoint执行的时间并汇总到Metric监控系统中。
- 如果AsyncCheckpointState为COMPLETED状态,则调用reportCompletedSnapshotStates()方法向JobManager汇报Checkpoint的执行结果。
- 如果出现其他异常情况,则调用handleExecutionException()方法进行处理。
public void run() {
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
// 创建TaskStateSnapshot
TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
TaskStateSnapshot localTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry :
operatorSnapshotsInProgress.entrySet()) {
OperatorID operatorID = entry.getKey();
OperatorSnapshotFutures snapshotInProgress = entry.getValue();
// 创建OperatorSnapshotFinalizer对象
OperatorSnapshotFinalizer finalizedSnapshots =
new OperatorSnapshotFinalizer(snapshotInProgress);
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getJobManagerOwnedState());
localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getTaskLocalState());
}
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
if (asyncCheckpointState.compareAndSet(
CheckpointingOperation.AsyncCheckpointState.RUNNING,
CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
reportCompletedSnapshotStates(
jobManagerTaskOperatorSubtaskStates,
localTaskOperatorSubtaskStates,
asyncDurationMillis);
} else {
LOG.debug("{} - asynchronous part of checkpoint {} could not be
completed because it was closed before.",
owner.getName(),
checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
handleExecutionException(e);
} finally {
owner.cancelables.unregisterCloseable(this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
至此,算子状态数据快照的逻辑基本完成,算子中的托管状态主要借助KeyedStateBackend和OperatorStateBackend管理。
KeyedStateBackend和OperatorStateBackend都实现了SnapshotStrategy接口,提供了状态快照的方法。SnapshotStrategy根据不同类型存储后端,主要有HeapSnapshotStrategy和RocksDBSnapshotStrategy两种类型。
1.5. 状态数据快照持久化
这里我们以HeapSnapshotStrategy为例,介绍在StateBackend中对状态数据进行状态快照持久化操作的步骤。如代码所示,
HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates()方法中定义了对KeyedState以及OperatorState的状态处理逻辑。
private void processSnapshotMetaInfoForAllStates(
List metaInfoSnapshots,
Map<StateUID, StateSnapshot> cowStateStableSnapshots,
Map<StateUID, Integer> stateNamesToId,
Map<String, ? extends StateSnapshotRestore> registeredStates,
StateMetaInfoSnapshot.BackendStateType stateType) {
for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :
registeredStates.entrySet()) {
final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
stateNamesToId.put(stateUid, stateNamesToId.size());
StateSnapshotRestore state = kvState.getValue();
if (null != state) {
final StateSnapshot stateSnapshot = state.stateSnapshot();
metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
cowStateStableSnapshots.put(stateUid, stateSnapshot);
}
}
}
二. CheckpointCoordinator管理Checkpoint
1. Checkpoint执行完毕后的确认过程
当StreamTask中所有的算子完成状态数据的快照操作后,Task实例会立即将TaskStateSnapshot消息发送到管理节点的CheckpointCoordinator中,并在CheckpointCoordinator中完成后续的操作。如图所示,Checkpoint执行完毕后的确认过程如下。
- 消息传递
- 管理PendingCheckpoint
- 添加CompletedCheckpoint:
- 通知Checkpoint操作结束。
- 通知同步
2. 触发并完成Checkpoint操作
CheckpointCoordinator组件接收到Task实例的Ack消息(快照完成了?)后,会触发并完成Checkpoint操作。如代码PendingCheckpoint.finalizeCheckpoint()方法的具体实现如下。
1)向sharedStateRegistry中注册operatorStates。
2)结束pendingCheckpoint中的Checkpoint操作并生成CompletedCheckpoint。
3)将completedCheckpoint添加到completedCheckpointStore中,
4)从pendingCheckpoint中移除checkpointId对应的PendingCheckpoint,
并触发队列中的Checkpoint请求。
5)向所有的ExecutionVertex节点发送CheckpointComplete消息,
通知Task实例本次Checkpoint操作完成。
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
throws CheckpointException {
final long checkpointId = pendingCheckpoint.getCheckpointId();
final CompletedCheckpoint completedCheckpoint;
// 首先向sharedStateRegistry中注册operatorStates
Map<OperatorID, OperatorState> operatorStates =
pendingCheckpoint.getOperatorStates();
sharedStateRegistry.registerAll(operatorStates.values());
// 对pendingCheckpoint中的Checkpoint做结束处理并生成CompletedCheckpoint
try {
try {
completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
failureManager.handleCheckpointSuccess(pendingCheckpoint.
getCheckpointId());
}
catch (Exception e1) {
// 如果出现异常则中止运行并抛出CheckpointExecution
if (!pendingCheckpoint.isDiscarded()) {
failPendingCheckpoint(pendingCheckpoint,
CheckpointFailureReason.FINALIZE_CHECKPOINT_
FAILURE, e1);
}
throw new CheckpointException("Could not finalize the pending
checkpoint " +
checkpointId + '.',
CheckpointFailureReason
.FINALIZE_CHECKPOINT_FAILURE, e1);
}
// 当完成finalization后,PendingCheckpoint必须被丢弃
Preconditions.checkState(pendingCheckpoint.isDiscarded()
&& completedCheckpoint != null);
// 将completedCheckpoint添加到completedCheckpointStore中
try {
completedCheckpointStore.addCheckpoint(completedCheckpoint);
} catch (Exception exception) {
// 如果completed checkpoint存储出现异常则进行清理
executor.execute(new Runnable() {
@Override
public void run() {
try {
completedCheckpoint.discardOnFailedStoring();
} catch (Throwable t) {
LOG.warn("Could not properly discard completed checkpoint {}.",
completedCheckpoint.getCheckpointID(), t);
}
}
});
throw new CheckpointException("Could not complete the pending
checkpoint " +
checkpointId + '.',
CheckpointFailureReason.
FINALIZE_CHECKPOINT_FAILURE, exception);
}
} finally {
// 最后从pendingCheckpoints中移除checkpointId对应的PendingCheckpoint
pendingCheckpoints.remove(checkpointId);
// 触发队列中的Checkpoint请求
triggerQueuedRequests();
}
// 记录checkpointId
rememberRecentCheckpointId(checkpointId);
// 清除之前的Checkpoints
dropSubsumedCheckpoints(checkpointId);
// 计算和前面Checkpoint操作之间的最低延时
lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();
LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).",
checkpointId, job,
completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
// 通知所有的ExecutionVertex节点Checkpoint操作完成
final long timestamp = completedCheckpoint.getTimestamp();
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
}
}
3. 通知CheckpointComplete给TaskExecutor
当TaskExecutor接收到来自CheckpointCoordinator的CheckpointComplete消息后,会调用Task.notifyCheckpointComplete()方法将消息传递到指定的Task实例中。Task线程会将CheckpointComplete消息通知给StreamTask中的算子。
如下代码,
/**
将notifyCheckpointComplete()转换成RunnableWithException线程并提交到Mailbox中运行,且在MailboxExecutor线程模型中获取和执行的优先级是最高的。
最终notifyCheckpointComplete()方法会在MailboxProcessor中运行。
**/
public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(
() -> notifyCheckpointComplete(checkpointId),
"checkpoint %d complete", checkpointId);
}
继续具体看StreamTask.notifyCheckpointComplete(),如下代码:
1)获取当前Task中算子链的算子,并发送Checkpoint完成的消息。
2)获取TaskStateManager对象,向其通知Checkpoint完成消息,这里主要调用
TaskLocalStateStore清理本地无用的Checkpoint数据。
3)如果当前Checkpoint是同步的Savepoint操作,直接完成并终止当前Task实例,并调用
resetSynchronousSavepointId()方法将syncSavepointId重置为空。
private void notifyCheckpointComplete(long checkpointId) {
try {
boolean success = actionExecutor.call(() -> {
if (isRunning) {
LOG.debug("Notification of complete checkpoint for task {}",
getName());
// 获取当前Task中operatorChain所有的Operator,并通知每个Operator
Checkpoint执行成功的消息
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
operator.notifyCheckpointComplete(checkpointId);
}
}
return true;
} else {
LOG.debug("Ignoring notification of complete checkpoint for
not-running task {}", getName());
return true;
}
});
// 获取TaskStateManager,并通知Checkpoint执行完成的消息
getEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId);
// 如果是同步的Savepoint操作,则直接完成当前Task
if (success && isSynchronousSavepointId(checkpointId)) {
finishTask();
// Reset to "notify" the internal synchronous savepoint mailbox loop.
resetSynchronousSavepointId();
}
} catch (Exception e) {
handleException(new RuntimeException("Error while confirming checkpoint", e));
}
}
算子接收到Checkpoint完成消息后,会根据自身需要进行后续的处理,默认在AbstractStreamOperator基本实现类中会通知keyedStateBackend进行后续操作。
对于AbstractUdfStreamOperator实例,会判断当前userFunction是否实现了CheckpointListener,如果实现了,则向UserFucntion通知Checkpoint执行完成的信息
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
if (userFunction instanceof CheckpointListener) {
((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);
}
}
三. 状态管理学习小结
通过学习状态管理的源码,我们可以再来思考下如下几个场景问题,是不是有一点“庖丁解牛”的意思!
flink中状态存在的意义是什么,涉及到哪些场景。
- 实时聚合:比如,计算过去一小时内的平均销售额。这时,你会需要使用到Flink的状态来存储过去一小时内的所有销售数据。
- 窗口操作:Flink SQL支持滚动窗口、滑动窗口、会话窗口等。这些窗口操作都需要Flink的状态来存储在窗口期限内的数据。
- 状态的持久化与任务恢复:实时任务挂掉之后,为了快速从上一个点恢复任务,可以使用savepoint和checkpoint。
- 多流join:Flink至少存储一个流中的数据,以便于在新的记录到来时进行匹配。
其次通过学习Flink状态管理相关源码,可以进一步了解状态管理的细节操作,为解决更加复杂的问题打下理论基础
- 深入理解任务运行过程中,各算子状态的流转机制;
- 快速定位问题:在遇到实际问题时,能够快速反应出是哪块逻辑出现了问题;
- 应对故障:状态管理和Flink容错机制相关,可以了解Flink发生故障时如何保证状态的一致性和可恢复性
- 二次开发:可以自定义状态后端,或者拓展优化已有的例如RocksDB状态后端等;
- 性能优化:了解了Flink是如何有效的处理和管理状态,就可以优化任务性能,减少资源消耗。
参考:《Flink设计与实现:核心原理与源码解析》–张利兵