目录

  • Queryable State (可查询状态 )

  • State Backends (状态后端)

  • State Schema Evolution (状态模式演变)

  • Custom State Serialization (自定义状态序列化)


五、Queryable State Beta (可查询状态 测试版)

Architecture (架构)
Activating Queryable State (激活可查询状态)
Making State Queryable (使状态可插叙)
     Queryable State Stream (可查询状态流)
     Managed Keyed State (Managed Keyed 状态)
Querying State (查询状态)
     Example (例)
Configuration (组态)
     State Server (状态服务)
     Proxy (代理)
Limitations (限制)


注意:可查询状态的客户端API当前处于不断发展的状态,并且不保证所提供接口的稳定性。在即将推出的Flink版本中,客户端可能会发生重大的API更改。

简而言之,此功能将Flink的managed keyed(分区)状态(请参阅Working with State)公开给外部世界,并允许用户从Flink外部查询作业的状态。对于某些情况,可查询状态消除了对外部系统(例如键值存储)的分布式 operations/transactions 的需要,这通常是实践中的瓶颈。此外,此功能对于调试目的可能特别有用。

注意:查询状态对象时,无需任何同步或复制即可从并发线程访问该对象。这是一种设计选择,因为上述任何一种都会导致增加的作业延迟,我们希望避免这种情况。由于任何状态后端使用Java堆空间,例如MemoryStateBackendFsStateBackend在检索值时不能与副本一起使用,而是直接引用存储的值,读取-修改-写入模式是不安全的,并且可能导致可查询状态服务由于并发修改而失败。 RocksDBStateBackend可以避免这些问题。

5.1 Architecture (架构)

在展示如何使用可查询的状态之前,简要描述组成它的实体是很有用的。可查询的状态功能包含三个主要实体:

  1. QueryableStateClient,它(可能)在Flink集群之外运行并提交用户查询,

  2. QueryableStateClientProxy,在每个TaskManager上运行(即在Flink集群内),负责接收客户端的查询,代表他从负责的TaskManager获取请求的状态,并将其返回给客户端,然后

  3. QueryableStateServer,在每个TaskManager上运行,负责提供本地存储状态。

客户端连接到其中一个代理,并发送与特定键 k 相关联的状态的请求。如Working with State中所述,keyed state 在key组中组织,每个TaskManager都分配了许多这些key组。要发现哪个TaskManager负责持有k的key组,代理将询问JobManager。根据答案,代理将查询在该TaskManager上运行的QueryableStateServer以获取与k相关联的状态,并将响应转发回客户端。

5.2 Activating Queryable State (激活可查询状态)

要在Flink集群上启用可查询状态,只需将flink-queryable-state-runtime_2.11-1.8.0.jar从Flink发行版的opt/文件夹复制到lib/文件夹即可。否则,未启用可查询状态功能。

要验证群集是否在启用了可查询状态的情况下运行,请检查该行的任何任务管理器的日志:“Started the Queryable State Proxy Server @ …”。

5.3 Making State Queryable (使状态可插叙)

现在您已在群集上激活了可查询状态,现在是时候看看如何使用它了。为了使状态对外界可见,需要使用以下方法明确查询状态:

  • QueryableStateStream,一个充当接收器的便捷对象,并将其传入值作为可查询状态提供,或者

  • stateDescriptor.setQueryable(String queryableStateName)方法,它使得状态描述符所代表的keyed state可查询。

以下部分解释了这两种方法的用法。

5.3.1 Queryable State Stream (可查询状态流)

在KeyedStream上调用.asQueryableState(stateName, stateDescriptor)会返回一个QueryableStateStream,它将其值提供为可查询状态。根据状态的类型asQueryableState()方法有以下变体:

注意:没有可查询的ListState接收器,因为它会导致不断增长的列表,这些列表可能无法清除,因此最终会消耗太多内存。

返回的QueryableStateStream可以看作是接收器,无法进一步转换。在内部QueryableStateStream被转换为operator,该operator使用所有传入记录来更新可查询状态实例。更新逻辑由asQueryableState调用中提供的StateDescriptor的类型暗示。在如下所示的程序中,keyed流的所有记录将用于通过ValueState.update(value)更新状态实例:

注意queryableStateName参数可以任意选择,仅用于查询。它不必与状态自己的名字相同。

该变体对于哪种类型的状态可以被查询没有限制。这意味着它可以用于任何ValueState,ReduceState,ListState,MapState,AggregatingState和当前不推荐使用的FoldingState。

5.4 Querying State (查询状态)

到目前为止,您已将集群设置为以可查询状态运行,并且已将(某些)状态声明为可查询状态。现在是时候看看如何查询这个状态了。

为此,您可以使用QueryableStateClient帮助程序类。这可以在flink-queryable-state-clientjar中找到,它必须作为项目的pom.xml中的依赖项与flink-core一起显式包含,如下所示:

有关详细信息,您可以查看如何设置Flink程序。

QueryableStateClient将您的查询提交给内部代理,然后内部代理将处理您的查询并返回最终结果。初始化客户端的唯一要求是提供有效的TaskManager hostname(请记住,每个任务管理器上都运行可查询的状态代理)以及代理侦听的端口。更多有关如何配置代理和状态服务port(s)在Configuration Section。

在客户端准备好的情况下,要查询与类型K的键相关联的类型V的状态,您可以使用以下方法:

上面返回一个CompletableFuture,最终保存具有ID jobID作业的queryableStateName标识的可查询状态实例的状态值。key是您感兴趣的状态的键,keyTypeInfo将告诉Flink如何序列化/反序列化它。最后,stateDescriptor包含有关请求状态的必要信息,即其类型(Value,Reduce等)以及有关如何序列化/反序列化它的必要信息。

细心的读者会注意到返回的future包含一个S类值,即一个包含实际值的State对象。这可以是Flink支持的任何状态类型:ValueState,ReduceState,ListState,MapState,AggregatingState和当前不推荐使用的FoldingState。

注意:这些状态对象不允许修改包含的状态。您可以使用它们来获取状态的实际值,例如使用valueState.get(),或迭代所包含的<K,V>条目,例如使用mapState.entries(),但您无法修改它们。例如,在返回的列表状态上调用add()方法将抛出UnsupportedOperationException

注意:客户端是异步的,可以由多个线程共享。它需要在未使用时通过QueryableStateClient.shutdown()关闭以释放资源。

5.4.1 Example (例)

以下示例通过使其可查询来扩展CountWindowAverage示例(请参阅使用 Managed Keyed 状态),并显示如何查询此值:

在job中使用后,您可以检索job ID,然后从此operator查询任何键的当前状态:

5.5 Configuration (组态)

以下配置参数会影响可查询状态服务和客户端的行为。它们在QueryableStateOptions中定义。

5.5.1 State Server (状态服务)

  • queryable-state.server.ports:可查询状态服务器的服务端口范围。如果有多个task manager在同一台机器上运行,这对于避免端口冲突很有用。指定的范围可以是:端口:“9123”,一系列端口:“50100-50200”,或范围and/or列表:“50100-50200,50300-50400,51234”。默认端口为9067。

  • queryable-state.server.network-threads:接收状态服务传入请求的网络(事件循环)线程数(0 => #slots)

  • queryable-state.server.query-threads:处理/提供状态服务传入请求的线程数 (0 => #slots)。

5.5.2 Proxy (代理)

  • queryable-state.proxy.ports:可查询状态代理的服务器端口范围。如果有多个task manager在同一台机器上运行,这对于避免端口冲突很有用。指定的范围可以是:端口:“9123”,一系列端口:“50100-50200”,或范围and/or列表:“50100-50200,50300-50400,51234”。默认端口为9069。

  • queryable-state.proxy.network-threads:接收客户端代理传入请求的网络(事件循环)线程数(0 => #slots)

  • queryable-state.proxy.query-threads:处理/提供客户端代理的传入请求的线程数(0 => #slots)。

5.6 Limitations (限制)

  • 可查询状态生命周期与作业的生命周期绑定,例如,任务在启动时注册可查询状态,并在释放时注销它。在将来的版本中,需要将其解耦以便在任务完成后允许查询,并通过状态复制加速恢复。

  • 关于可用KvState的通知通过简单的告知发生。在未来,应该通过询问和确认来改进这一点。

  • 服务器和客户端会跟踪查询的统计信息。默认情况下,这些功能目前处于禁用状态,一旦有更好的支持通过Metrics系统发布这些数字,我们就应该启用统计数据。



六、State Backends (状态后端)

Flink提供了不同的状态后端,用于指定状态的存储方式和位置。

State可以位于Java的堆上或堆外。根据您的状态后端,Flink还可以管理应用程序的状态,这意味着Flink处理内存管理(如果需要可能会溢出到磁盘)以允许应用程序保持非常大的状态。默认情况下,配置文件flink-conf.yaml确定所有Flink作业的状态后端。

但是,可以基于每个作业重写默认状态后端,如下所示。

有关可用状态后端,其优点、限制和配置参数的详细信息,请参阅Deployment & Operations中的相应部分。

七、State Schema Evolution (状态模式演进)

Evolving state schema (不断演进的状态模式)
Supported data types for schema evolution (支持状态模式的数据类型)
     POJO types
     Avro types


Apache Flink流式应用程序通常设计为无限期或长时间运行。与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。与应用程序工作的数据模式相同,它们随应用程序一起演进。

此页面概述了如何改进状态类型的数据模式。当前限制因不同类型和状态结构(ValueState,ListState等)而异。

请注意,仅当您使用由Flink自己的类型序列化框架生成的状态序列化程序时,此页面上的信息才是相关的。也就是说,在声明您的状态时,提供的状态描述符未配置为使用特定的TypeSerializer或TypeInformation,在这种情况下,Flink会推断有关状态类型的信息:

在引擎下是否可以演进状态模型取决于用于读/写持久状态字节的串行器。简而言之,注册状态的架构只有在其序列化器正确支持它时才能演进。这由Flink的类型序列化框架生成的序列化器透明地处理(当前的支持范围如下所列)。

如果您打算为状态类型实现自定义TypeSerializer,并希望了解如何实现序列化程序以支持状态模式演变,请参阅自定义状态序列化。那个文档还包含有关状态序列化器和Flink状态后端之间相互作用的必要内部细节,以支持状态模式演变。

7.1 Evolving state schema (不断演进的状态模式)

要演进给定状态类型的模式,您将执行以下步骤:

  • 获取Flink流式作业的savepoint。

  • 更新应用程序中的状态类型(例如,修改Avro类型架构)。

  • 从savepoint还原作业。首次访问状态时,Flink将评估模式是否已更改为状态,并在必要时迁移状态模式。

迁移状态以适应更改的模式的过程自动发生,并且对于每个状态独立发生。此过程由Flink在内部执行,首先检查状态的新序列化程序是否具有与先前序列化程序不同的序列化模式; 如果是这样,之前的序列化器用于将状态读取到对象,并使用新的序列化器再次写回字节。

有关迁移过程的更多详细信息超出了本文档的范围; 请参考这里。

7.2 Supported data types for schema evolution (支持状态模式的数据类型)

目前,仅支持POJO和Avro类型的模式演变。因此,如果您关心状态的模式演变,目前建议始终使用Pojo或Avro作为状态数据类型

有计划扩大对更多复合类型的支持; 有关详细信息,请参阅FLINK-10896。

7.2.1 POJO types

Flink支持基于以下规则集的POJO类型模式演变:

  1. 字段可以删除。删除后,将在以后的检查点和保存点中删除已删除字段的先前值。

  2. 可以添加新字段。根据Java的定义,新字段将初始化为其类型的默认值。

  3. 声明的字段类型不能更改。

  4. POJO类型的类名称不能更改,包括类的namespace。

请注意,只有在使用Flink版本高于1.8.0的先前savepoint进行恢复时,才能演变POJO类型状态的模式。使用早于1.8.0的Flink版本进行还原时,无法更改此模式。

7.2.2 Avro types

Flink完全支持Avro类型状态的演进模式,只要模式更改被Avro的模式解析规则视为兼容。

一个限制是,当作业恢复时,Avro生成的用作状态类型的类无法重定位或拥有不同的命名空间。



八、Custom State Serialization(自定义状态序列化)

Custom Serialization for Managed State (Managed State的自定义序列化)

Using custom state serializers (使用自定义状态序列化)
State serializers and schema evolution (状态序列化和模式演变)
     The TypeSerializerSnapshot abstraction (TypeSerializerSnapshot抽象)
     How Flink interacts with the TypeSerializer and TypeSerializerSnapshot abstractions (Flink如何与TypeSerializer和TypeSerializerSnapshot抽象交互)
Predefined convenient TypeSerializerSnapshot classes (预定义的方便的TypeSerializerSnapshot类)
     Implementing a SimpleTypeSerializerSnapshot (实现一个SimpleTypeSerializerSnapshot)
     Implementing a CompositeTypeSerializerSnapshot (实现一个CompositeTypeSerializerSnapshot)
Implementation notes and best practices (实施说明和最佳实践)
Migrating from deprecated serializer snapshot APIs before Flink 1.7 (从Flink 1.7之前已弃用的序列化程序快照API迁移)


此页面的目标是需要对其状态使用自定义序列化的用户,包括如何提供自定义状态序列化程序以及实现允许状态模式演变的序列化程序的指南和最佳实践。

如果您只是使用Flink自有的序列化程序,则此页面无关紧要,可以忽略。

8.1 Using custom state serializers (使用自定义状态序列化)

注册managed operator或keyed state时,需要StateDescriptor指定状态名称以及有关状态类型的信息。Flink的类型序列化框架使用类型信息为状态创建适当的序列化程序。

也可以完全绕过这个并让Flink使用您自己的自定义序列化程序来序列化managed states,使用您自己的TypeSerializer实现来直接实例化StateDescriptor:

8.2 State serializers and schema evolution (状态序列化和模式演变)

本节介绍与状态序列化和模式演变相关的面向用户的抽象,以及有关Flink如何与这些抽象交互的必要内部详细信息。

从savepoint恢复时,Flink允许更改用于读取和写入先前注册状态的序列化程序,以便用户不会锁定到任何特定的序列化模式。恢复状态时,将为状态注册新的序列化程序(即,用于访问还原作业中的状态的StateDescriptor附带的序列化程序)。此新序列化程序可能具有与先前序列化程序不同的架构。因此,在实现状态序列化器时,除了读/写数据的基本逻辑之外,还要记住的另一个重要事项是如何在将来更改序列化模式。

当谈到模式时,在该上下文中,该术语在引用状态类型的数据模型和状态类型的序列化二进制格式之间是可互换的。一般来说,架构可能会在以下几种情况下发生变化:

  1. 状态类型的数据模式已经改变,即从用作状态的POJO添加或删除字段。

  2. 一般来说,在更改数据模式后,需要升级序列化程序的序列化格式。

  3. 序列化的配置已更改。

为了使新执行具有关于所写状态的状态的信息并检测模式是否已经改变,在获取operator状态的保存点时,需要将状态序列化的快照与状态字节一起写入。这被抽象为TypeSerializerSnapshot,在下一小节中进行了解释。

8.2.1 The TypeSerializerSnapshot abstraction (TypeSerializerSnapshot抽象)

序列化程序的TypeSerializerSnapshot是一个时间点信息,作为状态序列化程序写入模式的单一事实来源,以及恢复与给定时间点相同的序列化程序所必需的任何其他信息。关于在恢复时应该写入和读取的内容的逻辑,因为在writeSnapshotreadSnapshot方法中定义了序列化器快照。

请注意,快照自己的写入架构可能还需要随时间更改(例如,当您希望向快照添加有关序列化程序的更多信息时)。为此,快照的版本化,并在getCurrentVersion方法中定义当前版本号。在还原时,从savepoint读取序列化程序快照时,将在其中写入快照的架构版本提供给readSnapshot方法,以便读取实现可以处理不同的版本。

在还原时,应在resolveSchemaCompatibility方法中实现检测新序列化程序的架构是否已更改的逻辑。在恢复的Operator执行中使用新的序列化程序再次注册先前的已注册状态时,新的序列化程序将通过此方法提供给先前的序列化程序的快照。此方法返回TypeSerializerSchemaCompatibility,表示兼容性解析的结果,可以是以下之一:

  1. TypeSerializerSchemaCompatibility.compatibleAsIs(): 此结果表明新串行器是兼容的,这意味着新的串行器与先前的串行器具有相同的架构。可能已在resolveSchemaCompatibility方法中重新配置了新的序列化程序,以使其兼容。

  2. TypeSerializerSchemaCompatibility.compatibleAfterMigration(): 此结果表明新的序列化程序具有不同的序列化架构,并且可以通过使用先前的序列化程序(识别旧架构)从旧架构迁移以将字节读取到状态对象,然后将对象重写为字节使用新的序列化程序(识别新架构)。

  3. TypeSerializerSchemaCompatibility.incompatible(): 此结果表明新的序列化程序具有不同的序列化架构,但无法从旧架构迁移。

最后一点详细说明了在需要迁移的情况下如何获得先前的序列化器。序列化程序的TypeSerializerSnapshot的另一个重要作用是它用作恢复先前序列化程序的工厂。更具体地说,TypeSerializerSnapshot应该实现restoreSerializer方法来实例化一个识别前一个序列化程序的模式和配置的序列化程序实例,因此可以安全地读取前一个序列化程序写入的数据。

8.2.2 How Flink interacts with the TypeSerializer and TypeSerializerSnapshot abstractions (Flink如何与TypeSerializer和TypeSerializerSnapshot抽象交互)

总结一下,本节总结了Flink,或者更具体地说是状态后端如何与抽象交互。根据状态后端,交互略有不同,但这与状态序列化程序及其序列化程序快照的实现是正交的。

Off-heap state backends (e.g. RocksDBStateBackend) (堆外状态后端,例如RocksDBStateBackend)

  1. Register new state with a state serializer that has schema A (使用具有模式A的状态序列化程序注册新状态)

    • 状态的已注册TypeSerializer用于在每次状态访问时读/写状态。

    • 状态在模式A中写。

  2. Take a savepoint(取得一个savepoint)

    • 通过TypeSerializer#snapshotConfiguration方法提取序列化程序快照。

    • 序列化程序快照将写入savepoint,以及已经序列化的状态字节(使用架构A)。

  3. Restored execution re-accesses restored state bytes with new state serializer that has schema B(使用新状态序列化器的模式B来恢复执行重新访问恢复的状态字节)

    • 恢复先前的状态序列化程序的快照。

    • 状态字节在还原时不反序列化,仅加载回状态后端(因此,仍在模式A中)。

    • 收到新的序列化程序后,它将通过TypeSerializer#resolveSchemaCompatibility提供给已恢复的先前序列化程序的快照,以检查架构兼容性。

  4. Migrate state bytes in backend from schema A to schema B(将后端中的状态字节从架构A迁移到架构B)

    • 如果兼容性解决方案反映了架构已更改并且可以进行迁移,则会执行架构迁移。识别模式A的先前状态序列化程序将通过TypeSerializerSnapshot#restoreSerializer()从序列化程序快照中获取,并用于将状态字节反序列化为对象,然后使用新的序列化程序重新编写,后者识别模式B完成迁移。在继续处理之前,将访问状态的所有条目全部迁移。

    • 如果解析信号不兼容,则状态访问失败并出现异常。

Heap state backends (e.g. MemoryStateBackend, FsStateBackend) (堆状态后端,例如MemoryStateBackend,FsStateBackend )

  1. Register new state with a state serializer that has schema A (使用具有模式A的状态序列化程序注册新状态)

    • 注册的TypeSerializer由状态后端维护。

  2. Take a savepoint, serializing all state with schema A (获取savepoint,使用模式A序列化所有状态)

    • 通过TypeSerializer#snapshotConfiguration方法提取序列化程序快照。

    • 序列化程序快照将写入保存点。

    • 现在,状态对象被序列化为savepoint,以模式A编写。

  3. On restore, deserialize state into objects in heap (在还原时,将状态反序列化为堆中的对象)

    • 恢复先前的状态序列化程序的快照。

    • 识别模式A的先前序列化程序是通过TypeSerializerSnapshot#restoreSerializer()从序列化程序快照获取的,用于将状态字节反序列化为对象。

    • 从现在开始,所有的状态都已经反序列化了。

  4. Restored execution re-accesses previous state with new state serializer that has schema B(使用新状态序列化器的模式B来恢复执行重新访问恢复的状态字节)

    • 收到新的序列化程序后,它将通过TypeSerializer#resolveSchemaCompatibility提供给已恢复的先前序列化程序的快照,以检查架构兼容性。

    • 如果兼容性检查发出需要迁移的信号,则在这种情况下不会发生任何事情,因为对于堆后端,所有状态都已反序列化为对象。

    • 如果解析信号不兼容,则状态访问失败并出现异常。

  5. Take another savepoint, serializing all state with schema B(拿另一个savepoint,使用模式B序列化所有状态)

    • 与步骤2相同,但现在状态字节全部在模式B中。

8.3 Predefined convenient TypeSerializerSnapshot classes (预定义的方便的TypeSerializerSnapshot类)

Flink提供了两个可用于典型场景的抽象基类TypeSerializerSnapshot类:SimpleTypeSerializerSnapshotCompositeTypeSerializerSnapshot

提供这些预定义快照作为其序列化程序快照的序列化程序必须始终具有自己的独立子类实现。这对应于不在不同序列化程序之间共享快照类的最佳实践,这将在下一节中进行更全面的解释。

8.3.1 Implementing a SimpleTypeSerializerSnapshot (实现一个SimpleTypeSerializerSnapshot)

SimpleTypeSerializerSnapshot适用于没有任何状态或配置的序列化程序,这实际上意味着序列化程序的序列化模式仅由序列化程序的类定义。

使用SimpleTypeSerializerSnapshot作为序列化程序的快照类时,只有2种可能的兼容性解析结果:

  • TypeSerializerSchemaCompatibility.compatibleAsIs(), 如果新的序列化程序类保持相同,或

  • TypeSerializerSchemaCompatibility.incompatible(), 如果新的序列化程序类与前一个类不同。

下面是如何使用SimpleTypeSerializerSnapshot的示例,使用Flink的IntSerializer作为示例:

IntSerializer没有状态或配置。序列化格式仅由序列化程序类本身定义,并且只能由另一个IntSerializer读取。因此,它适合SimpleTypeSerializerSnapshot的用例。

SimpleTypeSerializerSnapshot的基础超类构造函数需要相应序列化程序的提供者实例,无论快照当前是在恢复还是在快照期间写入。该提供者用于创建还原序列化程序,以及类型检查以验证新的序列化程序是否与预期的序列化程序类相同。

8.3.2 Implementing a CompositeTypeSerializerSnapshot (实现一个CompositeTypeSerializerSnapshot)

CompositeTypeSerializerSnapshot适用于依赖多个嵌套序列化程序进行序列化的序列化程序。

在进一步解释之前,我们将依赖于多个嵌套序列化器的串行器称为此上下文中的“外部”序列化器。这方面的例子可能是MapSerializer,ListSerializer,GenericArraySerializer等。例如,考虑MapSerializer - 键和值序列化器将是嵌套的序列化器,而MapSerializer本身就是“外部”序列化器。

在这种情况下,外部序列化程序的快照还应包含嵌套序列化程序的快照,以便可以独立检查嵌套序列化程序的兼容性。在解析外部序列化程序的兼容性时,需要考虑每个嵌套序列化程序的兼容性。

提供CompositeTypeSerializerSnapshot以帮助实现这些复合序列化器的快照。它涉及读取和编写嵌套的序列化程序快照,以及解决最终的兼容性结果,同时考虑到所有嵌套序列化程序的兼容性。

下面是如何使用CompositeTypeSerializerSnapshot的示例,使用Flink的MapSerializer作为示例:

将新的序列化程序快照实现为CompositeTypeSerializerSnapshot的子类时,必须实现以下三种方法:

  • #getCurrentOuterSnapshotVersion(): 此方法定义当前外部序列化程序快照的序列化二进制格式的版本。

  • #getNestedSerializers(TypeSerializer): 给定外部序列化程序,返回其嵌套的序列化程序。

  • #createOuterSerializerWithNestedSerializers(TypeSerializer[]): 给定嵌套的序列化程序,创建外部序列化程序的实例。

上面的示例是CompositeTypeSerializerSnapshot,除了嵌套的序列化程序的快照之外,没有额外的信息要进行快照。因此,其外部快照版本可能永远不需要上升。但是,其他一些序列化程序包含一些需要与嵌套组件序列化程序一起保存的其他静态配置。这方面的一个例子是Flink的GenericArraySerializer,除了嵌套的元素序列化器之外,它还包含数组元素类的类。

在这些情况下,需要在CompositeTypeSerializerSnapshot上实现另外三种方法:

  • #writeOuterSnapshot(DataOutputView): 定义外部快照信息的写入方式。

  • #readOuterSnapshot(int, DataInputView, ClassLoader): 定义如何读取外部快照信息。

  • #isOuterSnapshotCompatible(TypeSerializer): 检查外部快照信息是否保持相同。

默认情况下,CompositeTypeSerializerSnapshot假定没有任何外部快照信息可供读/写,因此上述方法具有空的默认实现。如果子类具有外部快照信息,则必须实现所有三种方法。

下面是使用Flink的GenericArraySerializer作为示例,将CompositeTypeSerializerSnapshot用于具有外部快照信息的复合串行器快照的示例:

上面的代码片段中有两点需要注意。首先,由于此CompositeTypeSerializerSnapshot实现具有作为快照的一部分编写的外部快照信息,因此只要外部快照信息的序列化格式发生更改,就必须升级由getCurrentOuterSnapshotVersion()定义的外部快照版本。

其次,请注意在编写组件类时我们如何避免使用Java序列化,只需编写类名并在读回快照时动态加载它。避免用于编写串行器快照内容的Java序列化通常是一个很好的做法。有关这方面的更多详细信息将在下一节中介绍。

8.4 Implementation notes and best practices (实施说明和最佳实践)

1. Flink通过使用classname实例化它们来恢复序列化程序快照

序列化程序的快照是注册状态序列化的唯一真实来源,它是savepoint中读取状态的入口点。为了能够恢复和访问先前的状态,必须能够恢复先前的状态序列化程序的快照。

Flink通过首先使用其classname(与快照字节一起写入)实例化TypeSerializerSnapshot来恢复序列化程序快照。因此,为了避免出现意外的类名更改或实例化失败,TypeSerializerSnapshot类应该:

  • 避免被实现为匿名类或嵌套类,

  • 有一个public,无参的构造函数用于实例化

2. 避免跨不同的序列化程序共享相同的TypeSerializerSnapshot类

由于架构兼容性检查是通过序列化程序快照,因此让多个序列化程序返回与其快照相同的TypeSerializerSnapshot类会使
TypeSerializerSnapshot#resolveSchemaCompatibilityTypeSerializerSnapshot#restoreSerializer()方法的实现复杂化。

这也是对问题的严重分离; 单个序列化程序的序列化架构,配置以及如何还原它应该合并到其自己的专用TypeSerializerSnapshot类中。

3. 避免对序列化程序快照内容使用Java序列化

在编写持久化串行器快照的内容时,不应使用Java序列化。例如,一个序列化程序需要将其目标类型的类保留为其快照的一部分。有关该类的信息应该通过编写类名来保留,而不是使用Java直接序列化该类。读取快照时,将读取类名,并用于通过名称动态加载类。

此做法可确保始终可以安全地读取序列化程序快照。在上面的示例中,如果使用Java序列化持久保存类型类,则一旦类实现发生更改,快照可能不再可读,并且根据Java序列化细节不再具有二进制兼容性。

8.5 Migrating from deprecated serializer snapshot APIs before Flink 1.7 (从Flink 1.7之前已弃用的序列化程序快照API迁移)

本节是从Flink 1.7之前存在的序列化程序和序列化程序快照进行API迁移的指南。

在Flink 1.7之前,序列化程序快照是作为TypeSerializerConfigSnapshot实现的(现在已弃用,并且最终将被删除以完全替换为新的TypeSerializerSnapshot接口)。此外,序列化器模式兼容性检查的责任在TypeSerializer中生效,在TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)方法中实现。

新旧抽象之间的另一个主要区别是,已弃用的TypeSerializerConfigSnapshot无法实例化先前的序列化程序。因此,如果序列化程序仍将TypeSerializerConfigSnapshot的子类作为其快照返回,则序列化程序实例本身将始终使用Java序列化写入保存点,以便在还原时可以使用先前的序列化程序。这是非常不合需要的,因为恢复作业是否成功容易受到先前序列化程序类的可用性的影响,或者通常,是否可以使用Java序列化在恢复时读回序列化程序实例。这意味着您只能使用适用于您的状态的相同序列化程序,并且一旦您想要升级序列化程序类或执行架构迁移,就可能会出现问题。

为了面向未来并具有迁移状态序列化程序和模式的灵活性,强烈建议从旧的抽象中进行迁移。执行此操作的步骤如下:

  1. 实现TypeSerializerSnapshot的新子类。这将是序列化程序的新快照。

  2. TypeSerializer#snapshotConfiguration()方法中将新的TypeSerializerSnapshot作为序列化程序的序列化程序快照返回。

  3. 从Flink 1.7之前存在的savepoint还原作业,然后再次使用savepoint。请注意,在此步骤中,序列化程序的旧TypeSerializerConfigSnapshot必须仍存在于类路径中,并且不得删除TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)方法的实现。此过程的目的是使用新实现的序列化程序TypeSerializerSnapshot替换旧savepoint中编写的TypeSerializerConfigSnapshot

  4. 使用Flink 1.7获取保存点后,保存点将包含TypeSerializerSnapshot作为状态序列化程序快照,并且不再在保存点中写入序列化程序实例。此时,现在可以安全地删除旧抽象类的所有实现(从序列化器中删除旧的TypeSerializerConfigSnapshot实现以及TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot))。

五、Queryable State Beta (可查询状态 测试版)

Architecture (架构)
Activating Queryable State (激活可查询状态)
Making State Queryable (使状态可插叙)
     Queryable State Stream (可查询状态流)
     Managed Keyed State (Managed Keyed 状态)
Querying State (查询状态)
     Example (例)
Configuration (组态)
     State Server (状态服务)
     Proxy (代理)
Limitations (限制)


注意:可查询状态的客户端API当前处于不断发展的状态,并且不保证所提供接口的稳定性。在即将推出的Flink版本中,客户端可能会发生重大的API更改。

简而言之,此功能将Flink的managed keyed(分区)状态(请参阅Working with State)公开给外部世界,并允许用户从Flink外部查询作业的状态。对于某些情况,可查询状态消除了对外部系统(例如键值存储)的分布式 operations/transactions 的需要,这通常是实践中的瓶颈。此外,此功能对于调试目的可能特别有用。

注意:查询状态对象时,无需任何同步或复制即可从并发线程访问该对象。这是一种设计选择,因为上述任何一种都会导致增加的作业延迟,我们希望避免这种情况。由于任何状态后端使用Java堆空间,例如MemoryStateBackendFsStateBackend在检索值时不能与副本一起使用,而是直接引用存储的值,读取-修改-写入模式是不安全的,并且可能导致可查询状态服务由于并发修改而失败。 RocksDBStateBackend可以避免这些问题。

5.1 Architecture (架构)

在展示如何使用可查询的状态之前,简要描述组成它的实体是很有用的。可查询的状态功能包含三个主要实体:

  1. QueryableStateClient,它(可能)在Flink集群之外运行并提交用户查询,

  2. QueryableStateClientProxy,在每个TaskManager上运行(即在Flink集群内),负责接收客户端的查询,代表他从负责的TaskManager获取请求的状态,并将其返回给客户端,然后

  3. QueryableStateServer,在每个TaskManager上运行,负责提供本地存储状态。

客户端连接到其中一个代理,并发送与特定键 k 相关联的状态的请求。如Working with State中所述,keyed state 在key组中组织,每个TaskManager都分配了许多这些key组。要发现哪个TaskManager负责持有k的key组,代理将询问JobManager。根据答案,代理将查询在该TaskManager上运行的QueryableStateServer以获取与k相关联的状态,并将响应转发回客户端。

5.2 Activating Queryable State (激活可查询状态)

要在Flink集群上启用可查询状态,只需将flink-queryable-state-runtime_2.11-1.8.0.jar从Flink发行版的opt/文件夹复制到lib/文件夹即可。否则,未启用可查询状态功能。

要验证群集是否在启用了可查询状态的情况下运行,请检查该行的任何任务管理器的日志:“Started the Queryable State Proxy Server @ …”。

5.3 Making State Queryable (使状态可插叙)

现在您已在群集上激活了可查询状态,现在是时候看看如何使用它了。为了使状态对外界可见,需要使用以下方法明确查询状态:

  • QueryableStateStream,一个充当接收器的便捷对象,并将其传入值作为可查询状态提供,或者

  • stateDescriptor.setQueryable(String queryableStateName)方法,它使得状态描述符所代表的keyed state可查询。

以下部分解释了这两种方法的用法。

5.3.1 Queryable State Stream (可查询状态流)

在KeyedStream上调用.asQueryableState(stateName, stateDescriptor)会返回一个QueryableStateStream,它将其值提供为可查询状态。根据状态的类型asQueryableState()方法有以下变体:

注意:没有可查询的ListState接收器,因为它会导致不断增长的列表,这些列表可能无法清除,因此最终会消耗太多内存。

返回的QueryableStateStream可以看作是接收器,无法进一步转换。在内部QueryableStateStream被转换为operator,该operator使用所有传入记录来更新可查询状态实例。更新逻辑由asQueryableState调用中提供的StateDescriptor的类型暗示。在如下所示的程序中,keyed流的所有记录将用于通过ValueState.update(value)更新状态实例:

注意queryableStateName参数可以任意选择,仅用于查询。它不必与状态自己的名字相同。

该变体对于哪种类型的状态可以被查询没有限制。这意味着它可以用于任何ValueState,ReduceState,ListState,MapState,AggregatingState和当前不推荐使用的FoldingState。

5.4 Querying State (查询状态)

到目前为止,您已将集群设置为以可查询状态运行,并且已将(某些)状态声明为可查询状态。现在是时候看看如何查询这个状态了。

为此,您可以使用QueryableStateClient帮助程序类。这可以在flink-queryable-state-clientjar中找到,它必须作为项目的pom.xml中的依赖项与flink-core一起显式包含,如下所示:

有关详细信息,您可以查看如何设置Flink程序。

QueryableStateClient将您的查询提交给内部代理,然后内部代理将处理您的查询并返回最终结果。初始化客户端的唯一要求是提供有效的TaskManager hostname(请记住,每个任务管理器上都运行可查询的状态代理)以及代理侦听的端口。更多有关如何配置代理和状态服务port(s)在Configuration Section。

在客户端准备好的情况下,要查询与类型K的键相关联的类型V的状态,您可以使用以下方法:

上面返回一个CompletableFuture,最终保存具有ID jobID作业的queryableStateName标识的可查询状态实例的状态值。key是您感兴趣的状态的键,keyTypeInfo将告诉Flink如何序列化/反序列化它。最后,stateDescriptor包含有关请求状态的必要信息,即其类型(Value,Reduce等)以及有关如何序列化/反序列化它的必要信息。

细心的读者会注意到返回的future包含一个S类值,即一个包含实际值的State对象。这可以是Flink支持的任何状态类型:ValueState,ReduceState,ListState,MapState,AggregatingState和当前不推荐使用的FoldingState。

注意:这些状态对象不允许修改包含的状态。您可以使用它们来获取状态的实际值,例如使用valueState.get(),或迭代所包含的<K,V>条目,例如使用mapState.entries(),但您无法修改它们。例如,在返回的列表状态上调用add()方法将抛出UnsupportedOperationException

注意:客户端是异步的,可以由多个线程共享。它需要在未使用时通过QueryableStateClient.shutdown()关闭以释放资源。

5.4.1 Example (例)

以下示例通过使其可查询来扩展CountWindowAverage示例(请参阅使用 Managed Keyed 状态),并显示如何查询此值:

在job中使用后,您可以检索job ID,然后从此operator查询任何键的当前状态:

5.5 Configuration (组态)

以下配置参数会影响可查询状态服务和客户端的行为。它们在QueryableStateOptions中定义。

5.5.1 State Server (状态服务)

  • queryable-state.server.ports:可查询状态服务器的服务端口范围。如果有多个task manager在同一台机器上运行,这对于避免端口冲突很有用。指定的范围可以是:端口:“9123”,一系列端口:“50100-50200”,或范围and/or列表:“50100-50200,50300-50400,51234”。默认端口为9067。

  • queryable-state.server.network-threads:接收状态服务传入请求的网络(事件循环)线程数(0 => #slots)

  • queryable-state.server.query-threads:处理/提供状态服务传入请求的线程数 (0 => #slots)。

5.5.2 Proxy (代理)

  • queryable-state.proxy.ports:可查询状态代理的服务器端口范围。如果有多个task manager在同一台机器上运行,这对于避免端口冲突很有用。指定的范围可以是:端口:“9123”,一系列端口:“50100-50200”,或范围and/or列表:“50100-50200,50300-50400,51234”。默认端口为9069。

  • queryable-state.proxy.network-threads:接收客户端代理传入请求的网络(事件循环)线程数(0 => #slots)

  • queryable-state.proxy.query-threads:处理/提供客户端代理的传入请求的线程数(0 => #slots)。

5.6 Limitations (限制)

  • 可查询状态生命周期与作业的生命周期绑定,例如,任务在启动时注册可查询状态,并在释放时注销它。在将来的版本中,需要将其解耦以便在任务完成后允许查询,并通过状态复制加速恢复。

  • 关于可用KvState的通知通过简单的告知发生。在未来,应该通过询问和确认来改进这一点。

  • 服务器和客户端会跟踪查询的统计信息。默认情况下,这些功能目前处于禁用状态,一旦有更好的支持通过Metrics系统发布这些数字,我们就应该启用统计数据。



六、State Backends (状态后端)

Flink提供了不同的状态后端,用于指定状态的存储方式和位置。

State可以位于Java的堆上或堆外。根据您的状态后端,Flink还可以管理应用程序的状态,这意味着Flink处理内存管理(如果需要可能会溢出到磁盘)以允许应用程序保持非常大的状态。默认情况下,配置文件flink-conf.yaml确定所有Flink作业的状态后端。

但是,可以基于每个作业重写默认状态后端,如下所示。

有关可用状态后端,其优点、限制和配置参数的详细信息,请参阅Deployment & Operations中的相应部分。

七、State Schema Evolution (状态模式演进)

Evolving state schema (不断演进的状态模式)
Supported data types for schema evolution (支持状态模式的数据类型)
     POJO types
     Avro types


Apache Flink流式应用程序通常设计为无限期或长时间运行。与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。与应用程序工作的数据模式相同,它们随应用程序一起演进。

此页面概述了如何改进状态类型的数据模式。当前限制因不同类型和状态结构(ValueState,ListState等)而异。

请注意,仅当您使用由Flink自己的类型序列化框架生成的状态序列化程序时,此页面上的信息才是相关的。也就是说,在声明您的状态时,提供的状态描述符未配置为使用特定的TypeSerializer或TypeInformation,在这种情况下,Flink会推断有关状态类型的信息:

在引擎下是否可以演进状态模型取决于用于读/写持久状态字节的串行器。简而言之,注册状态的架构只有在其序列化器正确支持它时才能演进。这由Flink的类型序列化框架生成的序列化器透明地处理(当前的支持范围如下所列)。

如果您打算为状态类型实现自定义TypeSerializer,并希望了解如何实现序列化程序以支持状态模式演变,请参阅自定义状态序列化。那个文档还包含有关状态序列化器和Flink状态后端之间相互作用的必要内部细节,以支持状态模式演变。

7.1 Evolving state schema (不断演进的状态模式)

要演进给定状态类型的模式,您将执行以下步骤:

  • 获取Flink流式作业的savepoint。

  • 更新应用程序中的状态类型(例如,修改Avro类型架构)。

  • 从savepoint还原作业。首次访问状态时,Flink将评估模式是否已更改为状态,并在必要时迁移状态模式。

迁移状态以适应更改的模式的过程自动发生,并且对于每个状态独立发生。此过程由Flink在内部执行,首先检查状态的新序列化程序是否具有与先前序列化程序不同的序列化模式; 如果是这样,之前的序列化器用于将状态读取到对象,并使用新的序列化器再次写回字节。

有关迁移过程的更多详细信息超出了本文档的范围; 请参考这里。

7.2 Supported data types for schema evolution (支持状态模式的数据类型)

目前,仅支持POJO和Avro类型的模式演变。因此,如果您关心状态的模式演变,目前建议始终使用Pojo或Avro作为状态数据类型

有计划扩大对更多复合类型的支持; 有关详细信息,请参阅FLINK-10896。

7.2.1 POJO types

Flink支持基于以下规则集的POJO类型模式演变:

  1. 字段可以删除。删除后,将在以后的检查点和保存点中删除已删除字段的先前值。

  2. 可以添加新字段。根据Java的定义,新字段将初始化为其类型的默认值。

  3. 声明的字段类型不能更改。

  4. POJO类型的类名称不能更改,包括类的namespace。

请注意,只有在使用Flink版本高于1.8.0的先前savepoint进行恢复时,才能演变POJO类型状态的模式。使用早于1.8.0的Flink版本进行还原时,无法更改此模式。

7.2.2 Avro types

Flink完全支持Avro类型状态的演进模式,只要模式更改被Avro的模式解析规则视为兼容。

一个限制是,当作业恢复时,Avro生成的用作状态类型的类无法重定位或拥有不同的命名空间。


八、Custom State Serialization(自定义状态序列化)

Custom Serialization for Managed State (Managed State的自定义序列化)

Using custom state serializers (使用自定义状态序列化)
State serializers and schema evolution (状态序列化和模式演变)
     The TypeSerializerSnapshot abstraction (TypeSerializerSnapshot抽象)
     How Flink interacts with the TypeSerializer and TypeSerializerSnapshot abstractions (Flink如何与TypeSerializer和TypeSerializerSnapshot抽象交互)
Predefined convenient TypeSerializerSnapshot classes (预定义的方便的TypeSerializerSnapshot类)
     Implementing a SimpleTypeSerializerSnapshot (实现一个SimpleTypeSerializerSnapshot)
     Implementing a CompositeTypeSerializerSnapshot (实现一个CompositeTypeSerializerSnapshot)
Implementation notes and best practices (实施说明和最佳实践)
Migrating from deprecated serializer snapshot APIs before Flink 1.7 (从Flink 1.7之前已弃用的序列化程序快照API迁移)


此页面的目标是需要对其状态使用自定义序列化的用户,包括如何提供自定义状态序列化程序以及实现允许状态模式演变的序列化程序的指南和最佳实践。

如果您只是使用Flink自有的序列化程序,则此页面无关紧要,可以忽略。

8.1 Using custom state serializers (使用自定义状态序列化)

注册managed operator或keyed state时,需要StateDescriptor指定状态名称以及有关状态类型的信息。Flink的类型序列化框架使用类型信息为状态创建适当的序列化程序。

也可以完全绕过这个并让Flink使用您自己的自定义序列化程序来序列化managed states,使用您自己的TypeSerializer实现来直接实例化StateDescriptor:

8.2 State serializers and schema evolution (状态序列化和模式演变)

本节介绍与状态序列化和模式演变相关的面向用户的抽象,以及有关Flink如何与这些抽象交互的必要内部详细信息。

从savepoint恢复时,Flink允许更改用于读取和写入先前注册状态的序列化程序,以便用户不会锁定到任何特定的序列化模式。恢复状态时,将为状态注册新的序列化程序(即,用于访问还原作业中的状态的StateDescriptor附带的序列化程序)。此新序列化程序可能具有与先前序列化程序不同的架构。因此,在实现状态序列化器时,除了读/写数据的基本逻辑之外,还要记住的另一个重要事项是如何在将来更改序列化模式。

当谈到模式时,在该上下文中,该术语在引用状态类型的数据模型和状态类型的序列化二进制格式之间是可互换的。一般来说,架构可能会在以下几种情况下发生变化:

  1. 状态类型的数据模式已经改变,即从用作状态的POJO添加或删除字段。

  2. 一般来说,在更改数据模式后,需要升级序列化程序的序列化格式。

  3. 序列化的配置已更改。

为了使新执行具有关于所写状态的状态的信息并检测模式是否已经改变,在获取operator状态的保存点时,需要将状态序列化的快照与状态字节一起写入。这被抽象为TypeSerializerSnapshot,在下一小节中进行了解释。

8.2.1 The TypeSerializerSnapshot abstraction (TypeSerializerSnapshot抽象)

序列化程序的TypeSerializerSnapshot是一个时间点信息,作为状态序列化程序写入模式的单一事实来源,以及恢复与给定时间点相同的序列化程序所必需的任何其他信息。关于在恢复时应该写入和读取的内容的逻辑,因为在writeSnapshotreadSnapshot方法中定义了序列化器快照。

请注意,快照自己的写入架构可能还需要随时间更改(例如,当您希望向快照添加有关序列化程序的更多信息时)。为此,快照的版本化,并在getCurrentVersion方法中定义当前版本号。在还原时,从savepoint读取序列化程序快照时,将在其中写入快照的架构版本提供给readSnapshot方法,以便读取实现可以处理不同的版本。

在还原时,应在resolveSchemaCompatibility方法中实现检测新序列化程序的架构是否已更改的逻辑。在恢复的Operator执行中使用新的序列化程序再次注册先前的已注册状态时,新的序列化程序将通过此方法提供给先前的序列化程序的快照。此方法返回TypeSerializerSchemaCompatibility,表示兼容性解析的结果,可以是以下之一:

  1. TypeSerializerSchemaCompatibility.compatibleAsIs(): 此结果表明新串行器是兼容的,这意味着新的串行器与先前的串行器具有相同的架构。可能已在resolveSchemaCompatibility方法中重新配置了新的序列化程序,以使其兼容。

  2. TypeSerializerSchemaCompatibility.compatibleAfterMigration(): 此结果表明新的序列化程序具有不同的序列化架构,并且可以通过使用先前的序列化程序(识别旧架构)从旧架构迁移以将字节读取到状态对象,然后将对象重写为字节使用新的序列化程序(识别新架构)。

  3. TypeSerializerSchemaCompatibility.incompatible(): 此结果表明新的序列化程序具有不同的序列化架构,但无法从旧架构迁移。

最后一点详细说明了在需要迁移的情况下如何获得先前的序列化器。序列化程序的TypeSerializerSnapshot的另一个重要作用是它用作恢复先前序列化程序的工厂。更具体地说,TypeSerializerSnapshot应该实现restoreSerializer方法来实例化一个识别前一个序列化程序的模式和配置的序列化程序实例,因此可以安全地读取前一个序列化程序写入的数据。

8.2.2 How Flink interacts with the TypeSerializer and TypeSerializerSnapshot abstractions (Flink如何与TypeSerializer和TypeSerializerSnapshot抽象交互)

总结一下,本节总结了Flink,或者更具体地说是状态后端如何与抽象交互。根据状态后端,交互略有不同,但这与状态序列化程序及其序列化程序快照的实现是正交的。

Off-heap state backends (e.g. RocksDBStateBackend) (堆外状态后端,例如RocksDBStateBackend)

  1. Register new state with a state serializer that has schema A (使用具有模式A的状态序列化程序注册新状态)

    • 状态的已注册TypeSerializer用于在每次状态访问时读/写状态。

    • 状态在模式A中写。

  2. Take a savepoint(取得一个savepoint)

    • 通过TypeSerializer#snapshotConfiguration方法提取序列化程序快照。

    • 序列化程序快照将写入savepoint,以及已经序列化的状态字节(使用架构A)。

  3. Restored execution re-accesses restored state bytes with new state serializer that has schema B(使用新状态序列化器的模式B来恢复执行重新访问恢复的状态字节)

    • 恢复先前的状态序列化程序的快照。

    • 状态字节在还原时不反序列化,仅加载回状态后端(因此,仍在模式A中)。

    • 收到新的序列化程序后,它将通过TypeSerializer#resolveSchemaCompatibility提供给已恢复的先前序列化程序的快照,以检查架构兼容性。

  4. Migrate state bytes in backend from schema A to schema B(将后端中的状态字节从架构A迁移到架构B)

    • 如果兼容性解决方案反映了架构已更改并且可以进行迁移,则会执行架构迁移。识别模式A的先前状态序列化程序将通过TypeSerializerSnapshot#restoreSerializer()从序列化程序快照中获取,并用于将状态字节反序列化为对象,然后使用新的序列化程序重新编写,后者识别模式B完成迁移。在继续处理之前,将访问状态的所有条目全部迁移。

    • 如果解析信号不兼容,则状态访问失败并出现异常。

Heap state backends (e.g. MemoryStateBackend, FsStateBackend) (堆状态后端,例如MemoryStateBackend,FsStateBackend )

  1. Register new state with a state serializer that has schema A (使用具有模式A的状态序列化程序注册新状态)

    • 注册的TypeSerializer由状态后端维护。

  2. Take a savepoint, serializing all state with schema A (获取savepoint,使用模式A序列化所有状态)

    • 通过TypeSerializer#snapshotConfiguration方法提取序列化程序快照。

    • 序列化程序快照将写入保存点。

    • 现在,状态对象被序列化为savepoint,以模式A编写。

  3. On restore, deserialize state into objects in heap (在还原时,将状态反序列化为堆中的对象)

    • 恢复先前的状态序列化程序的快照。

    • 识别模式A的先前序列化程序是通过TypeSerializerSnapshot#restoreSerializer()从序列化程序快照获取的,用于将状态字节反序列化为对象。

    • 从现在开始,所有的状态都已经反序列化了。

  4. Restored execution re-accesses previous state with new state serializer that has schema B(使用新状态序列化器的模式B来恢复执行重新访问恢复的状态字节)

    • 收到新的序列化程序后,它将通过TypeSerializer#resolveSchemaCompatibility提供给已恢复的先前序列化程序的快照,以检查架构兼容性。

    • 如果兼容性检查发出需要迁移的信号,则在这种情况下不会发生任何事情,因为对于堆后端,所有状态都已反序列化为对象。

    • 如果解析信号不兼容,则状态访问失败并出现异常。

  5. Take another savepoint, serializing all state with schema B(拿另一个savepoint,使用模式B序列化所有状态)

    • 与步骤2相同,但现在状态字节全部在模式B中。

8.3 Predefined convenient TypeSerializerSnapshot classes (预定义的方便的TypeSerializerSnapshot类)

Flink提供了两个可用于典型场景的抽象基类TypeSerializerSnapshot类:SimpleTypeSerializerSnapshotCompositeTypeSerializerSnapshot

提供这些预定义快照作为其序列化程序快照的序列化程序必须始终具有自己的独立子类实现。这对应于不在不同序列化程序之间共享快照类的最佳实践,这将在下一节中进行更全面的解释。

8.3.1 Implementing a SimpleTypeSerializerSnapshot (实现一个SimpleTypeSerializerSnapshot)

SimpleTypeSerializerSnapshot适用于没有任何状态或配置的序列化程序,这实际上意味着序列化程序的序列化模式仅由序列化程序的类定义。

使用SimpleTypeSerializerSnapshot作为序列化程序的快照类时,只有2种可能的兼容性解析结果:

  • TypeSerializerSchemaCompatibility.compatibleAsIs(), 如果新的序列化程序类保持相同,或

  • TypeSerializerSchemaCompatibility.incompatible(), 如果新的序列化程序类与前一个类不同。

下面是如何使用SimpleTypeSerializerSnapshot的示例,使用Flink的IntSerializer作为示例:

IntSerializer没有状态或配置。序列化格式仅由序列化程序类本身定义,并且只能由另一个IntSerializer读取。因此,它适合SimpleTypeSerializerSnapshot的用例。

SimpleTypeSerializerSnapshot的基础超类构造函数需要相应序列化程序的提供者实例,无论快照当前是在恢复还是在快照期间写入。该提供者用于创建还原序列化程序,以及类型检查以验证新的序列化程序是否与预期的序列化程序类相同。

8.3.2 Implementing a CompositeTypeSerializerSnapshot (实现一个CompositeTypeSerializerSnapshot)

CompositeTypeSerializerSnapshot适用于依赖多个嵌套序列化程序进行序列化的序列化程序。

在进一步解释之前,我们将依赖于多个嵌套序列化器的串行器称为此上下文中的“外部”序列化器。这方面的例子可能是MapSerializer,ListSerializer,GenericArraySerializer等。例如,考虑MapSerializer - 键和值序列化器将是嵌套的序列化器,而MapSerializer本身就是“外部”序列化器。

在这种情况下,外部序列化程序的快照还应包含嵌套序列化程序的快照,以便可以独立检查嵌套序列化程序的兼容性。在解析外部序列化程序的兼容性时,需要考虑每个嵌套序列化程序的兼容性。

提供CompositeTypeSerializerSnapshot以帮助实现这些复合序列化器的快照。它涉及读取和编写嵌套的序列化程序快照,以及解决最终的兼容性结果,同时考虑到所有嵌套序列化程序的兼容性。

下面是如何使用CompositeTypeSerializerSnapshot的示例,使用Flink的MapSerializer作为示例:

将新的序列化程序快照实现为CompositeTypeSerializerSnapshot的子类时,必须实现以下三种方法:

  • #getCurrentOuterSnapshotVersion(): 此方法定义当前外部序列化程序快照的序列化二进制格式的版本。

  • #getNestedSerializers(TypeSerializer): 给定外部序列化程序,返回其嵌套的序列化程序。

  • #createOuterSerializerWithNestedSerializers(TypeSerializer[]): 给定嵌套的序列化程序,创建外部序列化程序的实例。

上面的示例是CompositeTypeSerializerSnapshot,除了嵌套的序列化程序的快照之外,没有额外的信息要进行快照。因此,其外部快照版本可能永远不需要上升。但是,其他一些序列化程序包含一些需要与嵌套组件序列化程序一起保存的其他静态配置。这方面的一个例子是Flink的GenericArraySerializer,除了嵌套的元素序列化器之外,它还包含数组元素类的类。

在这些情况下,需要在CompositeTypeSerializerSnapshot上实现另外三种方法:

  • #writeOuterSnapshot(DataOutputView): 定义外部快照信息的写入方式。

  • #readOuterSnapshot(int, DataInputView, ClassLoader): 定义如何读取外部快照信息。

  • #isOuterSnapshotCompatible(TypeSerializer): 检查外部快照信息是否保持相同。

默认情况下,CompositeTypeSerializerSnapshot假定没有任何外部快照信息可供读/写,因此上述方法具有空的默认实现。如果子类具有外部快照信息,则必须实现所有三种方法。

下面是使用Flink的GenericArraySerializer作为示例,将CompositeTypeSerializerSnapshot用于具有外部快照信息的复合串行器快照的示例:

上面的代码片段中有两点需要注意。首先,由于此CompositeTypeSerializerSnapshot实现具有作为快照的一部分编写的外部快照信息,因此只要外部快照信息的序列化格式发生更改,就必须升级由getCurrentOuterSnapshotVersion()定义的外部快照版本。

其次,请注意在编写组件类时我们如何避免使用Java序列化,只需编写类名并在读回快照时动态加载它。避免用于编写串行器快照内容的Java序列化通常是一个很好的做法。有关这方面的更多详细信息将在下一节中介绍。

8.4 Implementation notes and best practices (实施说明和最佳实践)

1. Flink通过使用classname实例化它们来恢复序列化程序快照

序列化程序的快照是注册状态序列化的唯一真实来源,它是savepoint中读取状态的入口点。为了能够恢复和访问先前的状态,必须能够恢复先前的状态序列化程序的快照。

Flink通过首先使用其classname(与快照字节一起写入)实例化TypeSerializerSnapshot来恢复序列化程序快照。因此,为了避免出现意外的类名更改或实例化失败,TypeSerializerSnapshot类应该:

  • 避免被实现为匿名类或嵌套类,

  • 有一个public,无参的构造函数用于实例化

2. 避免跨不同的序列化程序共享相同的TypeSerializerSnapshot类

由于架构兼容性检查是通过序列化程序快照,因此让多个序列化程序返回与其快照相同的TypeSerializerSnapshot类会使
TypeSerializerSnapshot#resolveSchemaCompatibilityTypeSerializerSnapshot#restoreSerializer()方法的实现复杂化。

这也是对问题的严重分离; 单个序列化程序的序列化架构,配置以及如何还原它应该合并到其自己的专用TypeSerializerSnapshot类中。

3. 避免对序列化程序快照内容使用Java序列化

在编写持久化串行器快照的内容时,不应使用Java序列化。例如,一个序列化程序需要将其目标类型的类保留为其快照的一部分。有关该类的信息应该通过编写类名来保留,而不是使用Java直接序列化该类。读取快照时,将读取类名,并用于通过名称动态加载类。

此做法可确保始终可以安全地读取序列化程序快照。在上面的示例中,如果使用Java序列化持久保存类型类,则一旦类实现发生更改,快照可能不再可读,并且根据Java序列化细节不再具有二进制兼容性。

8.5 Migrating from deprecated serializer snapshot APIs before Flink 1.7 (从Flink 1.7之前已弃用的序列化程序快照API迁移)

本节是从Flink 1.7之前存在的序列化程序和序列化程序快照进行API迁移的指南。

在Flink 1.7之前,序列化程序快照是作为TypeSerializerConfigSnapshot实现的(现在已弃用,并且最终将被删除以完全替换为新的TypeSerializerSnapshot接口)。此外,序列化器模式兼容性检查的责任在TypeSerializer中生效,在TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)方法中实现。

新旧抽象之间的另一个主要区别是,已弃用的TypeSerializerConfigSnapshot无法实例化先前的序列化程序。因此,如果序列化程序仍将TypeSerializerConfigSnapshot的子类作为其快照返回,则序列化程序实例本身将始终使用Java序列化写入保存点,以便在还原时可以使用先前的序列化程序。这是非常不合需要的,因为恢复作业是否成功容易受到先前序列化程序类的可用性的影响,或者通常,是否可以使用Java序列化在恢复时读回序列化程序实例。这意味着您只能使用适用于您的状态的相同序列化程序,并且一旦您想要升级序列化程序类或执行架构迁移,就可能会出现问题。

为了面向未来并具有迁移状态序列化程序和模式的灵活性,强烈建议从旧的抽象中进行迁移。执行此操作的步骤如下:

  1. 实现TypeSerializerSnapshot的新子类。这将是序列化程序的新快照。

  2. TypeSerializer#snapshotConfiguration()方法中将新的TypeSerializerSnapshot作为序列化程序的序列化程序快照返回。

  3. 从Flink 1.7之前存在的savepoint还原作业,然后再次使用savepoint。请注意,在此步骤中,序列化程序的旧TypeSerializerConfigSnapshot必须仍存在于类路径中,并且不得删除TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)方法的实现。此过程的目的是使用新实现的序列化程序TypeSerializerSnapshot替换旧savepoint中编写的TypeSerializerConfigSnapshot

  4. 使用Flink 1.7获取保存点后,保存点将包含TypeSerializerSnapshot作为状态序列化程序快照,并且不再在保存点中写入序列化程序实例。此时,现在可以安全地删除旧抽象类的所有实现(从序列化器中删除旧的TypeSerializerConfigSnapshot实现以及TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot))。


本文分享自微信公众号 - 小晨说数据(flink-spark)。
如有侵权,请联系 [email protected] 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

03-24 21:12