转发请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/8260370.html
flink checkpoint 源码分析 (一)一文主要讲述了在JobManager端定时生成TriggerCheckpoint的代码部分,本文继续研究下TaskManager端如何处理收到的TriggerCheckpoint消息并执行对应的备份操作。
TriggerCheckpoint消息进入TaskManager的处理路径为 handleMessage -> handleCheckpointingMessage -> Task.triggerCheckpointBarrier
public void triggerCheckpointBarrier(
final long checkpointID,
long checkpointTimestamp,
final CheckpointOptions checkpointOptions) { final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp); if (executionState == ExecutionState.RUNNING && invokable != null) {
if (invokable instanceof StatefulTask) {
// build a local closure
final StatefulTask statefulTask = (StatefulTask) invokable;
final String taskName = taskNameWithSubtask;
final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
Runnable runnable = new Runnable() {
@Override
public void run() {
// set safety net from the task's context for checkpointing thread
LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); try {
boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
checkpointResponder.declineCheckpoint(
getJobID(), getExecutionId(), checkpointID,
new CheckpointDeclineTaskNotReadyException(taskName));
}
}
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " + checkpointID + " for " +
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for " +
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
} finally {
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
}
}
};
executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
}
else {
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask)); LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).",
taskNameWithSubtask, executionId); }
}
else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId); // send back a message that we did not do the checkpoint
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
}
}
在正常的情况下,triggerCheckpointBarrier会调用StreamTask内部实现的triggerCheckpoint()方法,并根据调用链条
triggerCheckpoint->performCheckpoint->checkpointState->CheckpointingOperation.executeCheckpointing
public void executeCheckpointing() throws Exception {
startSyncPartNano = System.nanoTime(); boolean failed = true;
try {
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
} if (LOG.isDebugEnabled()) {
LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
checkpointMetaData.getCheckpointId(), owner.getName());
} startAsyncPartNano = System.nanoTime(); checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000); // at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread
runAsyncCheckpointingAndAcknowledge();
failed = false; if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished synchronous part of checkpoint {}." +
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
}
在executeCheckpointing方法里进行了两个操作,首先是对该task对应的所有StreamOperator对象调用checkpointStreamOperator(op)
checkpointStreamOperator代码:
private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
if (null != op) {
// first call the legacy checkpoint code paths
nonPartitionedStates.add(op.snapshotLegacyOperatorState(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions)); OperatorSnapshotResult snapshotInProgress = op.snapshotState(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions); snapshotInProgressList.add(snapshotInProgress);
} else {
nonPartitionedStates.add(null);
OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult();
snapshotInProgressList.add(emptySnapshotInProgress);
}
}
StreamOperator的snapshotState(long checkpointId,long timestamp,CheckpointOptions checkpointOptions)方法最终由它的子类AbstractStreamOperator给出了一个final实现
@Override
public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception { KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult(); CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions); try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,
getContainingTask().getCancelables())) { snapshotState(snapshotContext); snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture()); if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
} if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
try {
snapshotInProgress.cancel();
} catch (Exception e) {
snapshotException.addSuppressed(e);
} throw new Exception("Could not complete snapshot " + checkpointId + " for operator " +
getOperatorName() + '.', snapshotException);
} return snapshotInProgress;
}
上述代码里的snapshotState(snapshotContext)方法在不同的最终operator中有自己的具体实现。
executeCheckpointing的第二个操作是然后是调用runAsyncCheckpointingAndAcknowledge执行
所有的state固化文件操作并返回acknowledgeCheckpoint给JobManager。
private static final class AsyncCheckpointRunnable implements Runnable, Closeable {
.....
..... if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
CheckpointingOperation.AsynCheckpointState.COMPLETED)) { owner.getEnvironment().acknowledgeCheckpoint(
checkpointMetaData.getCheckpointId(),
checkpointMetrics,
subtaskState);
补充,在上文提到的performCheckpoint方法内,调用checkpointState方法之前,flink会把预先把checkpointBarrier发送到下游task,以便下游operator尽快开始他们的checkpoint进程,
这也是flink barrier机制生成barrier的地方。
synchronized (lock) {
if (isRunning) {
// we can do a checkpoint // Since both state checkpointing and downstream barrier emission occurs in this
// lock scope, they are an atomic operation regardless of the order in which they occur.
// Given this, we immediately emit the checkpoint barriers, so the downstream operators
// can start their checkpoint work as soon as possible
operatorChain.broadcastCheckpointBarrier(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions); checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
try {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
}
catch (InterruptedException e) {
throw new IOException("Interrupted while broadcasting checkpoint barrier");
}
}
上述描述的触发checkpoint调用路径是针对source task的链路。对于其余非souce的operator,
方法链路为StreamInputProcessor/StreamTwoInputProcessor.processInput() ->barrierHandler.getNextNonBlocked()->processBarrier ->notifyCheckpoint->triggerCheckpointOnBarrier
参考文档: