一、背景:流作业动态调控
流数据处理是非常重要的一种数据处理方式,它在各个领域都有广泛的应用,比如机器学习、数据分析和实时事件处理以及实时交易等领域。流处理拥有低延迟和高吞吐量的特性,它被大规模部署成以流任务实例 stream task 构成的流作业 stream job 并行处理输入的数据流,流作业会部署成并行的流任务,这些流任务实例被中间流连接,并形成一个有向无环图。
流数据的并行处理是通过将输入数据在并行任务之间进行分区,然后每个任务独立处理分配的分区任务实时实现的,因为流作业是长期执行且会随着时间抖动,而不同的流作业有不同的性能需求,比如实时交易任务对延迟很敏感,而一些数据分析的任务对吞吐量要求很高。为了达到不同流处理作业的性能要求,动态重配置流任务的技术很关键。
常见的数据抖动有如下几类:
- 第一,输入速率的变化。流作业是长期执行,而数据流的输入速率会不可预测地发生动态变化,导致静态分配的资源无法低延迟高吞吐地处理数据流;
- 第二,数据倾斜。流数据的数据分布会动态变化,比如某个数据的出现频率增大会导致对应 stream task 的工作负载变大,延迟变大;
- 第三,新兴事件的产生。流数据中可能会出现新兴事件或者数据,这种数据无法被当前执行逻辑正确地执行。比如新型诈骗交易需要通过新的规则才能检测到。
针对不同的数据抖动,有不同类型的重配置技术来优化流作业,从而在保证资源利用率的同时,以高吞吐低延时的性能来处理流数据。
- 针对输入数列的变化,可以通过 scaling 的方式动态伸缩资源,提高吞吐量、降低延迟;
- 针对数据倾斜,可以通过 load balancing 的方式来重新分布并行执行流任务之间的工作负载,以重新达到负载均衡;
- 对于新兴事件的处理,可以通过 change of logic 的方式来更新流任务的执行逻辑,从而可以正确地处理新兴事件和数据。
有了不同类型的数据抖动和重配置技术后,需要考虑下一个问题就是如何动态地检测数据抖动,并选择合适的方法来调控流任务。为了解决这个的问题,通常是通过设计一个控制器来对任务进行动态重配置,控制器主要通过实时监听流作业分析症状,然后针对不同的症状修改不同的流作业配置,来做性能优化。
这个过程分为三步:监听、诊断、重配置。
- 首先控制器可以实时监听流任务,目前流作业的控制器主要通过监听系统层面的metrics比如CPU utilization,或应用层面的metrics比如端到端的数据处理延迟、吞吐量积压等,来进行建模分析和策略判断;
- 然后控制器通过控制策略来诊断症状,控制策略可以通过预定义的规则,比如CPU利用率高于一定阈值就执行scaling out,或进行模型分析,比如预测需要达到的资源分配来诊断存在的问题;
- 最后控制器选择不同类型的重配置方法去动态优化流作业。
为了减少为不同流作业实现控制器的工程开销,需要有一个控制平台来对流作业进行托管。控制平台封装了 metrics 和重配置方法,并且对外提供相应的 API,从而开发者可以在流作业部署好之后通过在控制平台提交控制器,对流作业进行托管。这样的控制器也包含了自定义的控制策略,并且可以直接使用控制平台的 API 实现 metrics 的采集和重配置,隐藏了系统底层的处理逻辑,简化了控制器的设计和开发。
大部分流处理系统都封装了比较成熟的 metrics 系统,因此控制平台可以基于原有系统 API 实现 metrics 的采集,然而动态重配置的支持仍是一个较大的挑战。
二、挑战:兼顾普适、高效和易用
动态重配置的控制平台应当具有三种性质:
- 普适性,不同类型的控制策略需要使用不同类型的重配置方法;
- 高效性,重配置的执行应在短时间内完成,并且尽量不阻塞原数据处理;
- 易用性,API 应简单易用,用户调用时无需知道系统底层逻辑。
但是目前已有的解决方案只能满足上述部分性质,比如 Flink 支持动态地对流作业进行重配置,并提供了简单易用的 online interface 为用户实现控制器流作业的动态重配置。通过修改源代码和重新提交流作业的方式,Flink 的原生支持具有很强的普适性和应用性。然而重新部署也会带来很大的开销。比如资源重分配和全局的状态恢复。
Flink 重配置的具体执行流程如下:首先 JobManager 会触发一个 Savepoint 到整个流作业的 pipeline 上,Savepoint 完成之后,当前流作业的 global snapshots 将会返回到 JobManager 中,JobManager 在收到所有的 snapshots 后,终止当前的 pipeline,然后以新的配置重新部署流作业,并从当前的 Savepoint 恢复状态重新开始。
三、设计:以 Task 为中心的系统设计
为了满足重配置的三种性质,我们将介绍 Trisk:以 Task 为中心的流作业控制平台。
上图是 Trisk 的系统架构,它支持对流处理的重配置进行定义和实现,提供了以 Task 为中心的配置抽象,这个抽象包含了当前流作业三个维度的执行配置,并且基于抽象封装了原子操作,使得配置方法可以通过在抽象上组合原子操作来定义。为了提高效率,不同于 Flink 本身提供的宗旨和重启机制,Trisk 采用了部分暂停和恢复的技术来执行重配置,并且它的封装可以进一步利用 Flink 系统中的 Checkpoint 机制来实现一致性。同时 Trisk 提供了易于使用的编程 API,有预先定义好的常用重配置 API,还将原子操作封装为 API 来让用户自定义重配置。
Trisk的工作流程如下:
Trisk runtime 维护了 restful API,用户可以通过接口提交控制逻辑代码。接着由 Trisk runtime 编译代码并生成对应的控制策略,它会根据当前流作业的 metrics 做诊断和重配置决策。控制策略诊断到当前运行的流作业的数据抖动后,会通过与 Trisk runtime 交互来对流作业进行重配置。
其过程如下:首先控制策略会从 Trisk runtime 中获取一个 Trisk 配置抽象,用来获取当前流作业每个 task 的配置情况,然后会根据诊断结果使用不同类型的原子操作来对 Trisk 抽象进行更新。比如,如果判断出了输入速率增高的问题,控制策略将会通过分配更多的资源来部署新的 task,并且重新分配 task 之间的工作量,来增加流作业的吞吐量。最后控制策略会通过把更新好的 Trisk 抽象送回到 Trisk runtime 中,Trisk runtime 根据更新好的配置对流作业执行重配置优化 。
Trisk 重配置的执行是通过与底层的流系统进行交互来实现的,采用了部分暂停与恢复的方法来实现工作流程,因此可以避免终止整个流作业的情况下保持一致性,并且只会对部分 task 进行更新来降低时间开销。整个过程可以分为三步:prepare-sync-update。
其流程如下:prepare 阶段,流系统基于更新后的 Trisk 抽象,找出被更新的受影响的 task,并准备这些 task 更新后的实际配置;sync 阶段,为保证数据一致性,执行期间需全局同步流作业并暂停受影响的 task,不受影响的 task 可以继续执行。这里通过 Flink 的 checkpoint barrier 机制实现这个同步过程;update 阶段,受影响的 task 将被独立更新,并在更新完成后继续执行。
Trisk的三维抽象源自于流任务的三个步骤:
- 第一步:流作业提交到流系统时,会被封装成一个 Logical Graph 里面包含了流任务的执行逻辑,其中顶点 operator 里包含了 User Defined Function,边表示 operator 之间的中间数据流,每个 operator 会使用 UDF 来处理输入的数据流,并生成输出流,流入后面的 operator。
- 第二步:Logical Graph 的每一个 operator 会并行运行一定数量的 stream task,且输入数据流会被分配到不同的 stream task 并行执行。每个 stream task 分配到的输入数据流被称为该 task 的工作负载配置。
- 第三步,这些并行的 stream task 会被部署到服务器中物理执行,每个 stream task 都会在一台机器上分配到一定的资源比如 CPU 和内存,这样的资源分配描述了 stream task 的 resource 配置。
因此 Trisk 的三维抽象就是包含了以 task 为中心的 execution logic,workload、resources 配置,最终形成了一个有向无环图,存放在了 Trisk runtime 中。
我们对抽象中的每一个维度的更新都封装了原子操作,通过对三维抽象中每一个维度执行原子操作,可以细粒度地重配置流作业,从而满足重配置的普适性。比如 scaling 可以通过分配 resources 来重配置新的 task,并重分配并行任务之间的 workload 来实现。
上图展示了一个 scaling out 的例子,由于输入速率的不均匀上升导致 task2 的负载增大、延迟上升,且 task3 的利用率也很高,因此我们需要通过执行 scaling out 来分配一个新的执行任务 task5 并转移一部分 task2 的 workload 到 task5 上,来让当前流作业能继续低延迟高吞吐地处理输入流数据。
Trisk 提供了常用的重配置 API,对应着我们之前提到的三种重配置方法:scaling、load balancing、change of logic,用户可以使用提供的 API 在 Trisk 上实现控制策略。这些控制策略可以编译为运行在 Trisk runtime 上的线程来动态管理流作业。
上图例子显示了一个可以实现在流作业动态负载均衡的控制策略 load balance 的实现。它通过每秒检测 task 的工作负载,比如监听每个 task 的处理数据量的分布,并在 task 间的分布发生变化时重新分配 task 的 workload 来实现负载均衡。
同时用户也可以通过基于三维抽象的原子操作来定义新的重配置方法。我们将三种原子操作封装成了 assignLogic、assignWorkload、assignedResource 三个 API。
上图展示了 scaling 重配置方法基于对抽象执行原子操作的代码实现。通过 assignResource 来为新创建的任务分配资源,然后通过 assignWorkload 重新分配并行任务之间的工作负载来实现。
四、实现:基于 Flink 的 Barrier 机制
Trisk 控制平台是单独运行的一个后台服务,它封装提供了重配置 API。在 Flink 系统层中,也加入了一些新的组件来和 Trisk runtime 交互,并且高效执行对流作业的重配置。在 runtime 层中,controller 保存了用户自定义的控制策略和重配置方法。StreamManager 是 Trisk 的核心,它为用户提供了 API 并且维护了 web service 来接收新的 controller。在系统层中,JobReconfigCoordinator 维护 Trisk 抽象到 Flink 物理配置的映射,并协调执行重配置来保证流作业在重配置前后的数据一致性。
每个 StreamTask 会维护一个 TaskConfigManager,它会管理并更新对应 StreamTask 中的配置,来实现重配置。
Flink 内部的组件架构如上图。JobReconfigCoordinator 存在于 Flink 的 JobManager 中,并且在每个 StreamTask 上都维护了一个 TaskConfigManager。JobReconfigCoordinator 和 TaskConfigManager 可以通过 Flink 网络层进行远程交互,实现控制逻辑。
上图展示了重配置在 Flink 上的执行总览。
在 prepare 阶段,Coordinator 会收到 Trisk runtime 层分析好的抽象并准备好 StreamTask 的新配置。比如对 scaling 分配资源是通过获取一个新的 resource slot 实现,重分配 workload 是通过更新上游 task 的 result partition 和下游 task input gate 来实现的。对于 stateful 的 task 来,重分配 workload 还需要更新 task state backend。
在 synchronize 阶段,Coordinator 会利用 Flink 原有的 checkpoint barrier 机制,对受影响的 task 进行同步和暂停从而保证数据的一致性,其过程主要是通过从 source task 开始向整个 pipeline 发送 barrier,受影响的 StreamTask 会在接收到 barrier 之后暂停并等待来自 Coordinator 的更新指令。
同步完成后进入 update 阶段,Coordinator 会通知所有受影响的 task 去并行执行 update 来更新自己的配置。StreamTask 在更新完自己的配置后会自动恢复执行,并与上下游重新连接。
具体的实现细节有如下几项:
首先,对 Trisk abstraction 内部的配置和 Flink 的 JobGraph、ExecutionGraph 做了映射,因此 prepare 阶段中 Coordinator 会去更新对应的 JobGraph 和 ExecutionGraph,然后通过 Flink 的 barrier 机制实现了重配置执行中的同步来保证数据一致性。
其次,每个 task 的原子操作都尽量利用 Flink 原有的机制对 StreamTask 进行动态修改。比如 assignWorkload 是通过重新初始化一个 state backend 再重新更新上游 task 的 result partition 和当前 task 的 input gate 实现的。
重配置的具体执行流程分为以下几步:
首先在 prepare 阶段,JobReconfigCoordinator 会更新 JobGraph 和 ExecutionGraph。然后根据更新情况标记受影响的 StreamTask。prepare 完成后,Coordinator 利用 barrier 机制实现整个 pipeline 的同步,从 source task 通过 inject barrier 发送到整个 pipeline。受影响的 task 收到所有上游 task 的 barrier 后会暂停并 ack 到 Coordinator 中,再向下游 task 发送 barrier。下游 task 收到 barrier 之后也执行类似的操作,受影响的 task 暂停并 ack,而不受影响的继续保持执行。所有 task 都 ack 到了 Coordinator 之后,同步结束。
接下来进入 update 阶段,在 update 阶段,Coordinator 会通知 TaskConfigManager 去更新 StreamTask 的配置,更新完成后与上下游重新连接,并继续执行。
至此,重配置流程结束。
五、评估:Trisk 与已有系统的性能对比
我们进行了小规模实验,主要围绕以下两点目标:
- 第一, 在 Trisk 上实现的控制器总体效果如何,是否能满足控制器的优化目标比如延迟控制?
- 第二, 对比已有的重配置执行技术,如 Flink 原生支持和前沿的 Megaphone 机制,Trisk 的执行效率如何?
实验环境如下:我们将 Trisk 实现在 Flink-1.10.0 上,并配置了 4 个节点的 Flink standalone cluster,每个节点配置了 8 个 slots。我们使用了一个真实应用 stock-exchange 和一个合成应用 word-count 来实现。stock-exchange 是一个实时的股票交易任务,需要实时处理股票交易订单,来避免对用户的交易决策造成影响。word-count 是一个常用于数据分析中的操作,我们主要对输入流的每一个 key 进行 count。
我们在 stock-exchange 上实现了一个简单但具有代表性的 latency-aware 控制器。最初 stock-exchange 作业部署了 10 个任务,输入流是股票申报订单,输入曲线如左图所示。控制器可以通过使用 scaling 和 load balancing 来控制作业的延迟,主要根据输入速率和工作负载来作出决策。比如在第一百秒的时候,因为输入速率增大,所以做 scaling out;而在第四百秒的时候,因为输入速率降低,所以会做 scaling in。
在 Trisk 和 Flink 上实现的控制器都需要大概 100 行代码,主要包含了控制策略的逻辑。
实验结果如右图所示。为了展示控制器的优化效果,我们主要对比了 Trisk/Flink 的原生支持/静态配置下的 stock-exchange 作业的延时变化情况。红线是静态配置的 stock-exchange 作业,绿线是 Flink 上的控制器对流作业的优化效果,蓝线则是 Trisk 对 stock-exchange 的优化效果。
红线结果表明,虽然静态配置在开始时运行良好,但因为输入速率的增加,它无法实时处理 100 秒过后的数据,导致延迟增加了两个数量级。相比之下,使用 Flink 原生配置实现的控制器,能够适应工作负载的变化,但是在执行重配置期间会导致高延迟峰值,大概比平时的延迟高出 1~2 个数量级。而 Trisk 上做出决策的控制器展示了毫秒级的重配置完成时间,且只有可以忽略不计的延迟增量。这主要归功于 Trisk 的部分暂停与恢复技术。
再将 Trisk 重配置执行期间的运行效果与两个现有的方法进行比较,一个是 Flink 的终止和重启机制,以及 megaphone 提出了 fluid state migration 机制,可以在 key 层面对重配置进行同步和更新。实验中我们对 word-count 使用了 load balancing,初始配置有 20 个任务,并在第 50 秒时触发 load balancing。整个过程会重分配所有并行任务之间的 workload。
为了了解他们的行为,我们比较了执行重配置时的延迟和吞吐量。
从延迟图可以看出,Trisk 比 Flink 重配置带来的延迟低,而与 megaphone 相比,Trisk 具有最短的完成时间,但峰值延迟相对较高。从吞吐量图中可以看出,在重配置过程中 Trisk 的吞吐量下降了,但恢复得比 Flink 快。对于 megaphone 来说,fluid state migration 需要更长的时间来完成重配置,但在重配置阶段会有更低的峰值延迟和更高的吞吐量。
总的来说,我们提出了 Trisk:以 Task 为中心的控制平台,可以普适、高效、和易用地支持重配置方法。在未来的工作中,我们也将继续探索在 Trisk 上实现更多样的控制策略,来更好地利用 Trisk 上的重配置方法。
更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~