文章目录
Checkpoint的触发方式有两种
本文先介绍通过CheckpointCoordinator触发算子的Checkpoint操作
CheckpointCoordinator在整个作业中扮演了Checkpoint协调者
的角色,负责在数据源节点触发Checkpoint以及整个作业的Checkpoint管理,并且CheckpointCoordinator组件会接收TaskMananger在Checkpoint执行完成后返回的Ack消息。
一. 启动CheckpointCoordinator
当作业的JobStatus转换为Running时,通知CheckpointCoordinatorDeActivator监听器启动CheckpointCoordinator服务。
如代码CheckpointCoordinatorDeActivator.jobStatusChanges()方法主要包含如下逻辑。
> 1. 当`newJobStatus == JobStatus.RUNNING`时,立即调用
> coordinator.startCheckpointScheduler()方法启动整个Job的调度器
> CheckpointCoordinator,此时Checkpoint的触发依靠CheckpointCoordinator进行协调。
>
> 2. 当JobStatus为其他类型状态时,调用coordinator.stopCheckpointScheduler()方法,
> 停止当前Job中的Checkpoint操作。
public class CheckpointCoordinatorDeActivator implements JobStatusListener {
private final CheckpointCoordinator coordinator;
public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
this.coordinator = checkNotNull(coordinator);
}
@Override
public void jobStatusChanges(JobID jobId,JobStatus newJobStatus, long timestamp,
Throwable error) {
if (newJobStatus == JobStatus.RUNNING) {
// 启动Checkpoint调度程序
coordinator.startCheckpointScheduler();
} else {
// 直接停止CheckpointScheduler
coordinator.stopCheckpointScheduler();
}
}
}
二. 开启CheckpointScheduler线程
接下来在CheckpointCoordinator.startCheckpointScheduler()方法中调用scheduleTriggerWithDelay()方法进行后续操作,向创建好的checkpointCoordinatorTimer线程池添加定时调度执行的Runnable线程。
如代码所示:
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
return timer.scheduleAtFixedRate(
new ScheduledTrigger(),
initDelay, baseInterval, TimeUnit.MILLISECONDS);
}
三. 触发Checkpoint
如代码,ScheduledTrigger也是CheckpointCoordinator的内部类,实现了Runnable接口。在ScheduledTrigger.run()方法中调用了CheckpointCoordinator.triggerCheckpoint()方法触发和执行Checkpoint操作。
private final class ScheduledTrigger implements Runnable {
@Override
public void run() {
try {
// 调用triggerCheckpoint()方法触发Checkpoint操作
triggerCheckpoint(System.currentTimeMillis(), true);
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint for job {}.", job, e);
}
}
}
CheckpointCoordinator.triggerCheckpoint()方法包含的执行逻辑非常多,这里重点介绍其中的主要逻辑。根据CheckpointCoordinator触发Checkpoint操作的过程分为以下几个部分。
1. Checkpoint执行前的工作
- 构建执行和触发Checkpoint操作对应的Task节点实例的Execution集合,其中tasksToTrigger数组中存储了触发Checkpoint操作的ExecutionVertex元素,实际上就是所有的数据源节点。
- 构建需要发送Ack消息的ExecutionVertex集合,主要是从tasksToWaitFor集合中转换而来。
// 主要做前置检查
synchronized (lock) {
preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
}
// 创建需要执行的Task对应的Execution集合
Execution[] executions = new Execution[tasksToTrigger.length];
// 遍历tasksToTrigger集合,构建Execution集合
for (int i = 0; i < tasksToTrigger.length; i++) {
//获取Task对应的Execution集合
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee == null) {
// 如果Task对应的Execution集合为空,代表Task没有被执行,则抛出异常
LOG.info("Checkpoint triggering task {} of job {} is not being
executed at the moment. Aborting checkpoint.", tasksToTrigger[i].
getTaskNameWithSubtaskIndex(), job);
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
} else if (ee.getState() == ExecutionState.RUNNING) {
// 如果ExecutionState为RUNNING,则添加到executions集合中
executions[i] = ee;
} else {
// 如果其他ExecutionState不为RUNNING,则抛出异常
LOG.info("Checkpoint triggering task {} of job {} is not in state {}
but {} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
// 组装用于需要发送Ack消息的Task集合
Map<ExecutionAttemptID, ExecutionVertex> ackTasks =
new HashMap<>(tasksToWaitFor.length);
for (ExecutionVertex ev : tasksToWaitFor) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
LOG.info("Checkpoint acknowledging task {} of job {} is not being
executed at the moment. Aborting checkpoint.", ev.getTaskNameWith
SubtaskIndex(), job);
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
2. 创建PendingCheckpoint
在执行Checkpoint操作之前,需要构建PendingCheckpoint对象,从字面意思上讲就是挂起Checkpoint操作。
从开始执行Checkpoint操作直到Task实例返回Ack确认成功消息,Checkpoint会一直处于Pending状态,确保Checkpoint能被成功执行。
如代码逻辑:
- 创建checkpointStorageLocation,用于定义Checkpoint过程中状态快照数据存放的位置。
- 创建PendingCheckpoint对象。
final CheckpointStorageLocation checkpointStorageLocation;
final long checkpointID;
try {
//通过checkpointIdCounter获取checkpointID
checkpointID = checkpointIdCounter.getAndIncrement();
// 获取checkpointStorageLocation
checkpointStorageLocation = props.isSavepoint() ?
checkpointStorage
.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
checkpointStorage.initializeLocationForCheckpoint(checkpointID);
}
// 省略部分代码
// 创建PendingCheckpoint对象
final PendingCheckpoint checkpoint = new PendingCheckpoint(
job,
checkpointID,
timestamp,
ackTasks,
masterHooks.keySet(),
props,
checkpointStorageLocation,
executor);
3. Checkpoint的触发与执行
在CheckpointCoordinator.triggerCheckpoint()方法中,会在synchronized(lock)模块内定义和执行Checkpoint操作的具体逻辑,主要包含如下步骤。
-
调用并执行MasterHook。可以通过实现MasterHook函数,准备外部系统环境或触发相应的系统操作。
-
遍历执行executions集合中的Execution节点,判断props.isSynchronous()方法是否为True,如果为True则调用triggerSynchronousSavepoint()方法同步执行Checkpoint操作。
其他情况则调用triggerCheckpoint()方法异步执行Checkpoint操作。
// 获取coordinator-wide lock
synchronized (lock) {
// TriggeringCheckpoint检查
preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp,
job);
// 将checkpoint存储在pendingCheckpoints KV集合中
pendingCheckpoints.put(checkpointID, checkpoint);
// 调度canceller线程,清理过期的Checkpoint对象
ScheduledFuture<?> cancellerHandle = timer.schedule(
canceller,
checkpointTimeout, TimeUnit.MILLISECONDS);
// 确定Checkpoint是否已经被释放
if (!checkpoint.setCancellerHandle(cancellerHandle)) {
cancellerHandle.cancel(false);
}
// 调用MasterHook方法
for (MasterTriggerRestoreHook<?> masterHook : masterHooks.values()) {
final MasterState masterState =
MasterHooks.triggerHook(masterHook, checkpointID, timestamp, executor)
.get(checkpointTimeout, TimeUnit.MILLISECONDS);
checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
}
Preconditions.checkState(checkpoint.areMasterStatesFullyAcknowledged());
}
// 创建CheckpointOptions
final CheckpointOptions checkpointOptions = new CheckpointOptions(
props.getCheckpointType(),
checkpointStorageLocation.getLocationReference());
// 分别执行executions中的Execution节点
for (Execution execution: executions) {
if (props.isSynchronous()) {
// 如果是同步执行,则调用triggerSynchronousSavepoint()方法
execution.triggerSynchronousSavepoint(checkpointID, timestamp,
checkpointOptions,
advanceToEndOfTime);
} else {
// 其他情况则调用triggerCheckpoint()异步方法执行
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
// 返回Checkpoint中的CompletionFuture对象
numUnsuccessfulCheckpointsTriggers.set(0);
return checkpoint.getCompletionFuture();
以上就完成了在CheckpointCoordinator中触发Checkpoint的全部操作,具体的执行过程调用Execution完成。
四. Task节点的Checkpoint操作
在Execution.triggerCheckpoint()方法中实际上调用triggerCheckpointHelper()方法完成Execution对应的Task节点的Checkpoint操作,并通过Task实例触发数据源节点的Checkpoint操作,如代码所示。
1. 触发准备
private void triggerCheckpointHelper(long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime
&& !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are
allowed to advance the watermark to MAX.");
}
// 获取当前Execution分配的LogicalSlot资源
final LogicalSlot slot = assignedResource;
// 如果LogicalSlot不为空,说明Execution运行正常
if (slot != null) {
// 通过slot获取TaskManagerGateway对象
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
// 调用triggerCheckpoint()方法
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(),
checkpointId, timestamp,
checkpointOptions,
advanceToEndOfEventTime);
} else {
// 否则说明Execution中没有资源,不再执行Execution对应的Task实例
LOG.debug("The execution has no slot assigned. This indicates that the
execution is no longer running.");
}
}
2. 调用TaskExecutor执行Checkpoint操作
TaskExecutor接收到来自CheckpointCoordinator的Checkpoint触发请求后,立即根据Execution信息确认Task实例线程,并且调用Task实例触发和执行数据源节点的Checkpoint操作。如代码,TaskExecutor.triggerCheckpoint()方法逻辑如下。
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId,
checkpointTimestamp, executionAttemptID);
//检查CheckpointType,确保只有同步的savepoint操作才能将Watermark调整为MAX
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() &&
checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are
allowed to advance the watermark to MAX.");
}
// 从taskSlotTable中获取当前Execution对应的Task
final Task task = taskSlotTable.getTask(executionAttemptID);
// 如果task不为空,则调用triggerCheckpointBarrier()方法
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp,
checkpointOptions, advanceToEndOfEventTime);
// 返回CompletableFuture对象
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager received a checkpoint request for
unknown task " + executionAttemptID + '.';
// 如果task为空,则返回CheckpointException异常
log.debug(message);
return FutureUtils.completedExceptionally(
new CheckpointException(message,
CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
}
}
五. 在StreamTask中执行Checkpoint操作
在执行Task.triggerCheckpointBarrier()方法时,会借助AbstractInvokable中提供的triggerCheckpointAsync()方法触发并执行StreamTask中的Checkpoint操作。
public Future<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
// 异步提交Checkpoint操作
return mailboxProcessor.getMainMailboxExecutor().submit(
() -> triggerCheckpoint(checkpointMetaData,
checkpointOptions, advanceToEndOfEventTime),
"checkpoint %s with %s",
checkpointMetaData,
checkpointOptions);
}
StreamTask.triggerCheckpoint()方法主要逻辑如下。
boolean success = performCheckpoint(checkpointMetaData, checkpointOptions,
checkpointMetrics, advanceToEndOfEventTime);
if (!success) {
declineCheckpoint(checkpointMetaData.getCheckpointId());
}
return success;
在StreamTask.performCheckpoint()方法中,主要执行了Task实例的Checkpoint操作,该方法除了会通过CheckpointCoordinator触发之外,在下游算子通过CheckpointBarrier对齐触发Checkpoint操作时,也会调用该方法执行具体Task的Checkpoint操作。
下篇我们继续看CheckpointBarrier对齐触发Checkpoint的流程,了解StreamTask中performCheckpoint()方法如何执行Checkpoint操作,实现状态数据快照与持久化操作。
参考:《Flink设计与实现:核心原理与源码解析》–张利兵