在分布式架构中,当某个节点出现故障,其他节点基本不受影响。这时只需要重启应用,恢复之前某个时间点的状态继续处理就可以了。这一切看似简单,可是在实时流处理中,我们不仅需要保证故障后能够重启继续运行,还要保证结果的正确性、故障恢复的速度、对处理性能的影响,在 Flink 中,有一套完整的容错机制(fault tolerance)来保证故障后的恢复,其中最重要的就是检查点(checkpoint)和 保存点(Savepoint

检查点(Checkpoint

1、什么是检查点

        发生故障之后,最简单的想法当然是重启机器、重启应用。由于是分布式的集群,即使一个节点无法恢复,也不会影响应用的重启执行。这里的问题在于,流处理应用中的任务都是有状态的,而为了快速访问这些状态一般会直接放在堆内存里;现在重启应用,内存中的状态已经丢失,就意味着之前的计算全部白费了,需要从头来过。

        所以我们需要把之前的计算结果做个保存,这样重启之后就可以继续处理新数据、而不需要重新计算了。将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点”。遇到故障重启的时候,我们可以从检查点中“读档”,恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。

2、检查点的保存

        在Flink中检查点的触发是周期性的。具体来说,当每隔一段时间检查点保存操作被触发时,就把每个任务当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了检查点。

        每一次的触发时间是当所有任务都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存:这就相当于构建了一个“事务”(transaction)。如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;所以我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;Kafka 就是满足这些要求的一个最好的例子。

Flink的检查点和保存点-LMLPHP

如上图,当“hello”“world”“hello”这批数据被处理完后,触发checkpoint,会将状态数据写入外部存储中。

3、从检查点恢复状态

        在运行流处理程序时,Flink 会周期性地保存检查点。当发生故障时,就需要找到最近一
次成功保存的检查点来恢复状态

Flink的检查点和保存点-LMLPHP

如上图,第5条数据hello在sum计算中出错,这里 Source 任务已经处理完毕,所以偏移量为 5Map 任务也处理完成了。而 Sum 任务在处理中发生了故障,此时状态并未保存。

接下来就需要从检查点来恢复状态了。具体的步骤为:

  1. 重启应用 :遇到故障之后,第一步当然就是重启。我们将应用重新启动后,所有任务的状态会清空;
  2. 读取检查点,重置状态:找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态 中。这样,Flink 内部所有任务的状态,就恢复到了保存检查点的那一时刻;
  3. 重放数据为了不丢数据,我们应该从保存检查点后开始重新读取数据,这可以通过 Source 任务向外部数据源重新提交偏移量(offset)来实现;
  4. 续处理数据 :接下来,我们就可以正常处理数据了;

这样,就好像没有发生过故障一样;我们既没有丢掉数据也没有重复计算数据,这就保证了计算结果的正确性。在分布式系统中,这叫作实现了“精确一次”(exactly-once)的状态一致性保证。

4、检查点算法

4.1、检查点分界线(Barrier

Flink的检查点和保存点-LMLPHP

 在 JobManager 中有一个“检查点协调器”(checkpoint coordinator),专门用来协调处理检查点的相关工作。检查点协调器会定期向 TaskManager 发出指令,要求保存检查点(带着检查点 ID),TaskManager 会让所有的 Source 任务把自己的偏移量(算子状态)保存起来,并将带有检查点 ID 的分界线(barrier)插入到当前的数据流中,然后像正常的数据一样像下游传递,之后 Source 任务就可以继续读入新的数据了。

Barrier是种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫作检查点的
“分界线”(Checkpoint Barrier)。当收到Barrier这个特殊数据的时候,当前算子就把当前的状态进行快照。所以barrier 可以理解为“之前所有数据的状态更改保存入当前检查点

4.2、分布式快照算法

通过在流中插入分界线(barrier),我们可以明确地指示触发检查点保存的时间。在一条单一的流上,数据依次进行处理,顺序保持不变,可是对于处理多个分区的传递时数据的顺序就会出现乱序的问题。

算法的核心就是两个原则

  • 当上游任务向多个并行下游任务发送 barrier 时,需要广播出去;
  • 而当多个上游任务向同一个下游任务传递 barrier 时,需要在下游任务执行“分界线对齐”(barrier alignment)操作,也就是需要等到所有并行分区的 barrier 都到齐,才可以开始状态的保存。

Flink的检查点和保存点-LMLPHP

检查点保存的算法具体过程如下:

  1. JobManager 会周期性地向每个 TaskManager 发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点。收到指令后,TaskManger 会在所有 Source 任务中插入一个分界线(barrier),并将偏移量保存到远程的持久化存储中
  2. 状态存入持久化存储之后,会返回通知给 Source 任务。Source 任务就会向 JobManager 
    确认检查点完成,然后像数据一样把 barrier 向下游任务传递
  3. Map 任务没有状态,所以直接将 barrier 继续向下游传递。这时由于进行了 keyBy 分区, 所以需要将 barrier 广播到下游并行的两个 Sum 任务。同时,Sum 任务可能收到来自上游两个并行 Map 任务的 barrier,所以需要执行“分界线对齐”操作
  4. 各个分区的分界线都对齐后,就可以对当前状态做快照,保存到持久化存储了。存储完成
    之后,同样将 barrier 向下游继续传递,并通知 JobManager 保存完;
  5. 由于分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度。当出现背
    压( backpressure )时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕;
  6. 应用程序的所有任务收的状态保存完成,将各个状态组成一个完整的快照(相当于很多碎片组成一个完整的拼图),本次检查点已完成。
    保存点(Savepoint

保存点(Savepoint

除了检查点外,Flink 还提供了另一个非常独特的镜像保存功能——保存点。保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复。

保存点的用途

  • 版本管理和归档存储:对重要的节点进行手动备份,设置为某一版本,归档(archive)存储应用程序的状态。
  • 更新 Flink 版本:目前 Flink 的底层架构已经非常稳定,所以当 Flink 版本升级时,程序本身一般是兼容的。这时不需要重新执行所有的计算,只要创建一个保存点,停掉应用、升级 Flink 后,从保存点重启就可以继续处理了。
  • 更新应用程序:我们不仅可以在应用程序不变的时候,更新 Flink 版本;还可以直接更新应用程序。前提是程序必须是兼容的,也就是说更改之后的程序,状态的拓扑结构和数据类型都是不变的,这样才能正常从之前的保存点去加载。
  • 调整并行度:如果应用运行的过程中,发现需要的资源不足或已经有了大量剩余,也可以通过从保存点重启的方式,将应用程序的并行度增大或减小。
  • 暂停应用程序:有时候我们不需要调整集群或者更新程序,只是单纯地希望把应用暂停、释放一些资源来处理更重要的应用程序。使用保存点就可以灵活实现应用的暂停和重启,可以对有限的集群资源做最好的优化配置。

使用保存点

创建保存点

//要在命令行中为运行的作业创建一个保存点镜像,只需要执行:
bin/flink savepoint :jobId [:targetDirectory]
//这里 jobId 需要填充要做镜像保存的作业 ID,目标路径 targetDirectory 可选,表示保存点
state.savepoints.dir: hdfs:///flink/savepoints
//当然对于单独的作业,我们也可以在程序代码中通过执行环境来设置:
env.setDefaultSavepointDir("hdfs:///flink/savepoints");
//由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点:
bin/flink stop --savepointPath [:targetDirectory] :jobId

从保存点重启应用

//我们已经知道,提交启动一个 Flink 作业,使用的命令是 flink run;现在要从保存点重启一个应用,其实本质是一样的:
bin/flink run -s :savepointPath [:runArgs]
//这里只要增加一个-s 参数,指定保存点的路径就可以了,其他启动时的参数还是完全一样的。还有一个“Savepoint Path”,这就是从保存点启动应用的配置。
11-26 10:19