一个写湿的程序猿

一个写湿的程序猿

Flink SQL Upsert 出现乱序问题如何解决?分析、优化建议

1. 业务问题

在使用 Flink SQL 订阅 Upsert 数据更新时,在某些情况下,如关联的表够多,并行度够大,可能会有概率出现数据乱序问题。

如果数据汇的存储是有 upsert 行为存储引擎,如 es,会导致部分数据丢失,影响数据的准确性。

2. 乱序和解决方案

2.1 乱序原因

Flink SQL 的数据在向下游传递时,会按照 join key 的值的 hash,shuffle 到特定的算子上,因此,这个算法保证相同的 join key 能有序地被处理。

如果,这个 join key 不是 unique key 的话,且刚好这个 join key 发生变更,就会先后发出一个包含旧 join key 值的 -D 事件和有新 join key 值的 +I 事件,两个事件被 shuffle 到不同的算子并行计算。

由于背压等问题,不同的算子在处理事件上速度不同,就有可能 +I 事件先被处理完,-D 事件后处理完,于是,最终在数据汇上呈现乱序现象。

这是出现上述问题的一个 Issue:https://issues.apache.org/jira/browse/FLINK-20374

例子中,有两张表,分别为 test 表和 status 表,它们的 join 逻辑为:SELECT t.*, s.name FROM test AS t LEFT JOIN status AS s ON t.status = s.id,其中 status 是维表,在设置并行度为 40,更新 test 表 id = 2 的 status 值由 1 变为 2,然后,这条 test 表 id = 2 的数据在 es 上丢失了。

2.2 解决方案说明

社区针对这个问题,给出的解决方案是在 sink 的上游增加一个算子SinkUpsertMaterializer.java,增加一层数据缓冲,针对乱序的数据做些特殊处理,以解决下游 sink 数据丢失问题。

最初的 PR 在这里https://github.com/apache/flink/pull/17230

前面提到 SinkUpsertMaterializer 中增加了一层数据缓冲,它在正常有序的情况下,state 会存有当前 upsert key 的最新值。

假设有一张表,它的主键是 1, 有一个 column 名为 version,值为 1。当它的 version 值发生变更时,state 中有且只有一条最新的和 db 一致的数据。它的 timeline 如下图所示

Flink SQL Upsert 出现乱序问题如何解决?分析、优化建议-LMLPHP

db upsert 发生的 -U 事件会抵冲掉前面存储的数据,然后再放入新的数据到 state 中。如果在发生抵冲时,state 里的数据是空,说明没有发生乱序,这个 -U 事件正常向后 emit。否则,就说明发生了乱序。

Flink SQL Upsert 出现乱序问题如何解决?分析、优化建议-LMLPHP

如上图所示,当更新 version 值为 2 时的数据发生乱序,+U 事件也会进入到 state 缓冲列表中,+U 事件会正常向后 emit 出去。而 -U 事件抵冲完前面的缓存的数据时,发现 state 里还有数据,说明当前 -U 事件是一个乱序事件,做抛弃处理。

SinkUpsertMaterializer 其实无法很好地处理更多级的乱序问题,它还能 cover 稍微复杂的场景,场景如下图所示:

Flink SQL Upsert 出现乱序问题如何解决?分析、优化建议-LMLPHP

它在多级发生乱序时,会重发可能是最新的数据但最先到达的事件,以保证目标存储的数据一致性。

但是,如果乱序的情况再复杂点,这层缓冲就无法 cover 了。就如上图所示,再有一层乱序时,重发机制就可能发出错误的数据给下游。

3. 乱序问题现状

SinkUpsertMaterializer 还不能完全解决 upsert 乱序问题,如果关心这块的解决情况的,可关注这个 issue:https://issues.apache.org/jira/browse/FLINK-22826

对于我们普通开发者来说,首先建议升级 flink 版本,将 SinkUpsertMaterializer 用起来。

另外,我们在写 sql 时,要关注 join key ,尽量优化 sql,让它以 upsert key 做 shuffle,减少出现多重乱序的 join sql,来减少数据异常。

具体思路是,将一些维表数据先生成视图,再以 upsert key shuffle 到主链路上,这样,乱序就只会发生在生成视图的逻辑里面,主链路因为以 upsert key shuffle,所以基本不会再出现二次乱序。

03-31 16:46