本文介绍了重新运行后 Flink 状态为空(重新初始化)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试连接两个流,首先是在 MapValueState 中持久化:RocksDB 将数据保存在 checkpoint 文件夹中,但新运行后,state 为空.我在本地和 flink 集群中运行它,取消在集群中提交并简单地在本地重新运行

I'm trying to connect two streams, first is persisting in MapValueState: RocksDB save data in checkpoint folder, but after new run, state is empty. I run it locally and in flink cluster with cancel submiting in cluster and simply rerun locally

 env.setStateBackend(new RocksDBStateBackend(..)
 env.enableCheckpointing(1000)
 ...

   val productDescriptionStream: KeyedStream[ProductDescription, String] = env.addSource(..)
  .keyBy(_.id)

 val productStockStream: KeyedStream[ProductStock, String] = env.addSource(..)
    .keyBy(_.id)

  productDescriptionStream
  .connect(productStockStream)
  .process(ProductProcessor())
  .setParallelism(1)

env.execute("Product aggregator")

产品处理器

case class ProductProcessor() extends CoProcessFunction[ProductDescription, ProductStock, Product]{
private[this] lazy val stateDescriptor: MapStateDescriptor[String, ProductDescription] =
new MapStateDescriptor[String, ProductDescription](
  "productDescription",
  createTypeInformation[String],
  createTypeInformation[ProductDescription]
)
private[this] lazy val states: MapState[String, ProductDescription] = getRuntimeContext.getMapState(stateDescriptor)

override def processElement1(value: ProductDescription,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context,out: Collector[Product]
 ): Unit = {
  states.put(value.id, value)
 }}

 override def processElement2(value: ProductStock,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context, out: Collector[Product]
 ): Unit = {
  if (states.contains(value.id)) {
         val product =Product(
          id = value.id,
          description = Some(states.get(value.id).description),
          stock = Some(value.stock),
          updatedAt = value.updatedAt)
        out.collect(product )
 }}

推荐答案

检查点由 Flink 创建,用于从故障中恢复,而不是在手动关闭后恢复.当作业被取消时,默认行为是 Flink 删除检查点.由于作业不会再失败,因此不需要恢复.

Checkpoints are created by Flink for recovering from failures, not for resuming after a manual shutdown. When a job is canceled, the default behavior is for Flink to delete the checkpoints. Since the job can no longer fail, it won't need to recover.

您有多种选择:

(1) 将检查点配置为 在取消作业时保留检查点:

(1) Configure your checkpointing to retain checkpoints when a job is cancelled:

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(
  CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

然后,当您重新启动作业时,您需要指明您希望它从特定检查点重新启动:

Then when you restart the job you'll need to indicate that you want it restarted from a specific checkpoint:

flink run -s <checkpoint-path> ...

否则,每当您开始工作时,它都会以一个空的状态后端开始.

Otherwise, whenever you start a job it will begin with an empty state backend.

(2) 而不是取消作业,使用 停止保存点:

(2) Instead of canceling the job, use stop with savepoint:

flink stop [-p targetDirectory] [-d] <jobID>

之后,您将再次需要使用 flink run -s ... 从保存点恢复.

after which you'll again need to use flink run -s ... to resume from the savepoint.

使用保存点停止是一种比依赖最近的检查点要回退的更简洁的方法.

Stop with a savepoint is a cleaner approach than relying on there being a recent checkpoint to fall back to.

(3) 或者您可以使用 Ververica Platform Community Edition,这将抽象级别提高到您不必自己管理这些细节的程度.

(3) Or you could use Ververica Platform Community Edition, which raises the level of abstraction to the point where you don't have to manage these details yourself.

这篇关于重新运行后 Flink 状态为空(重新初始化)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-22 12:42