状态从本质上来说,是Flink算子子任务的一种本地数据,为了保证数据可恢复性,使用Checkpoint机制来将状态数据持久化输出到存储空间上。

  状态相关的主要逻辑有两项:

  一、将算子子任务本地内存数据在Checkpoint时snapshot写入存储;

  二、初始化或重启应用时,以一定的逻辑从存储中读出并变为算子子任务的本地内存数据。

  Keyed State对这两项内容做了更完善的封装,开发者可以开箱即用。对于Operator State来说,每个算子子任务管理自己的Operator State,或者说每个算子子任务上的数据流共享同一个状态,可以访问和修改该状态。Flink的算子子任务上的数据在程序重启、横向伸缩等场景下不能保证百分百的一致性。换句话说,重启Flink应用后,某个数据流元素不一定会和上次一样,还能流入该算子子任务上。因此,我们需要根据自己的业务场景来设计snapshot和restore的逻辑。为了实现这两个步骤,Flink提供了最为基础的CheckpointedFunction接口类。

public interface CheckpointedFunction {

  // Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化
    void snapshotState(FunctionSnapshotContext context) throws Exception;

  // 初始化时会调用这个方法,向本地状态中填充数据
    void initializeState(FunctionInitializationContext context) throws Exception;

}

  在Flink的Checkpoint机制下,当一次snapshot触发后,snapshotState会被调用,将本地状态持久化到存储空间上。这里我们可以先不用关心snapshot是如何被触发的,暂时理解成snapshot是自动触发的,后续文章会介绍Flink的Checkpoint机制。initializeState在算子子任务初始化时被调用,初始化包括两种场景:一、整个Flink作业第一次执行,状态数据被初始化为一个默认值;二、Flink作业重启,之前的作业已经将状态输出到存储,通过这个方法将存储上的状态读出并填充到这个本地状态中。

  Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。这里我们来看一个Flink官方提供的Sink案例以了解CheckpointedFunction的工作原理。

实例一:

// BufferingSink需要继承SinkFunction以实现其Sink功能,同时也要继承CheckpointedFunction接口类
class BufferingSink(threshold: Int = 0)
  extends SinkFunction[(String, Int)]
    with CheckpointedFunction {

  // Operator List State句柄
  @transient
  private var checkpointedState: ListState[(String, Int)] = _

  // 本地缓存
  private val bufferedElements = ListBuffer[(String, Int)]()

  // Sink的核心处理逻辑,将上游数据value输出到外部系统
  override def invoke(value: (String, Int), context: Context): Unit = {
    // 先将上游数据缓存到本地的缓存
    bufferedElements += value
    // 当本地缓存大小到达阈值时,将本地缓存输出到外部系统
    if (bufferedElements.size == threshold) {
      for (element <- bufferedElements) {
        // send it to the sink
      }
      // 清空本地缓存
      bufferedElements.clear()
    }
  }

  // 重写CheckpointedFunction中的snapshotState
  // 将本地缓存snapshot保存到存储上
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    // 将之前的Checkpoint清理
    checkpointedState.clear()
    // 将最新的数据写到状态中
    for (element <- bufferedElements) {
      checkpointedState.add(element)
    }
  }

  // 重写CheckpointedFunction中的initializeState
  // 初始化状态
  override def initializeState(context: FunctionInitializationContext): Unit = {
    // 注册ListStateDescriptor
    val descriptor = new ListStateDescriptor[(String, Int)](
      "buffered-elements",
      TypeInformation.of(new TypeHint[(String, Int)]() {})
    )

    // 从FunctionInitializationContext中获取OperatorStateStore,进而获取ListState
    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    // 如果是作业重启,读取存储中的状态数据并填充到本地缓存中
    if(context.isRestored) {
      for(element <- checkpointedState.get()) {
        bufferedElements += element
      }
    }
  }

}

  上面的代码在输出到Sink之前,先将数据放在本地缓存中,并定期进行snapshot,这实现了批量输出的功能,批量输出能够减少网络等开销。同时,程序能够保证数据一定会输出外部系统,因为即使程序崩溃,状态中存储着还未输出的数据,下次启动后还会将这些未输出数据读取到内存,继续输出到外部系统。

  注册和使用Operator State的代码和Keyed State相似,也是先注册一个StateDescriptor,并指定状态名字和数据类型,然后从FunctionInitializationContext中获取OperatorStateStore,进而获取ListState。如果是UnionListState,那么代码改为:context.getOperatorStateStore.getUnionListState

val descriptor = new ListStateDescriptor[(String, Long)](
    "buffered-elements",
    TypeInformation.of(new TypeHint[(String, Long)]() {})
)

checkpointedState = context.getOperatorStateStore.getListState(descriptor)

状态的初始化逻辑中,我们用context.isRestored来判断是否为作业重启,这样可以从之前的Checkpoint中恢复并写到本地缓存中。

  注意,CheckpointedFunction接口类的initializeState方法的参数为FunctionInitializationContext,基于这个上下文参数我们不仅可以通过getOperatorStateStore获取    OperatorStateStore,也可以通过getKeyedStateStore来获取KeyedStateStore,进而通过getStategetMapState等方法获取Keyed State,比如:context.getKeyedStateStore().getState(valueDescriptor)。这与在Rich函数类中使用Keyed State的方式并不矛盾。CheckpointedFunction是Flink有状态计算的最底层接口,它提供了最丰富的状态接口。

  ListCheckpointed接口类是CheckpointedFunction接口类的一种简写,ListCheckpointed提供的功能有限,只支持均匀分布的ListState,不支持全量广播的UnionListState。

public interface ListCheckpointed<T extends Serializable> {

    // Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化
    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

  // 从上次Checkpoint中恢复数据到本地内存
    void restoreState(List<T> state) throws Exception;
}

  跟CheckpointedFunction中的snapshotState方法一样,这里的snapshotState也是在做备份,但这里的参数列表更加精简,其中checkpointId是一个单调递增的数字,用来表示某次Checkpoint,timestamp是Checkpoint发生的实际时间,这个方法以列表形式返回需要写入存储的状态。restoreState方法用来初始化状态,包括作业第一次启动或者作业失败重启。参数是一个列表形式的状态,是均匀分布给这个算子子任务的状态数据。

实例二:

  一个函数可以实现ListCheckpointed接口来处理操作符的list state。ListCheckpointed接口无法处理ValueState和ListState,因为这些状态是注册在状态后端的。操作符状态类似于成员变量,和状态后端的交互通过ListCheckpointed接口的回调函数实现。接口提供了两个方法:

// 返回函数状态的快照,返回值为列表
snapshotState(checkpointId: Long, timestamp: Long): java.util.List[T]
// 从列表恢复函数状态
restoreState(java.util.List[T] state): Unit

  当Flink触发stateful functon的一次checkpoint时,snapshotState()方法会被调用。方法接收两个参数,checkpointId为唯一的单调递增的检查点Id,timestamp为当master机器开始做检查点操作时的墙上时钟(机器时间)。方法必须返回序列化好的状态对象的列表。

  当宕机程序从检查点或者保存点恢复时会调用restoreState()方法。restoreState使用snapshotState保存的列表来恢复。

下面的例子展示了如何实现ListCheckpointed接口。业务场景为:一个对每一个并行实例的超过阈值的温度的计数程序。

class HighTempCounter(val threshold: Double)
    extends RichFlatMapFunction[SensorReading, (Int, Long)]
    with ListCheckpointed[java.lang.Long] {

  // index of the subtask
  private lazy val subtaskIdx = getRuntimeContext
    .getIndexOfThisSubtask
  // local count variable
  private var highTempCnt = 0L

  override def flatMap(
      in: SensorReading,
      out: Collector[(Int, Long)]): Unit = {
    if (in.temperature > threshold) {
      // increment counter if threshold is exceeded
      highTempCnt += 1
      // emit update with subtask index and counter
      out.collect((subtaskIdx, highTempCnt))
    }
  }

  override def restoreState(
      state: util.List[java.lang.Long]): Unit = {
    highTempCnt = 0
    // restore state by adding all longs of the list
    for (cnt <- state.asScala) {
      highTempCnt += cnt
    }
  }

  override def snapshotState(
      chkpntId: Long,
      ts: Long): java.util.List[java.lang.Long] = {
    // snapshot state as list with a single count
    java.util.Collections.singletonList(highTempCnt)
  }
}

  上面的例子中,每一个并行实例都计数了本实例有多少温度值超过了设定的阈值。例子中使用了操作符状态,并且每一个并行实例都拥有自己的状态变量,这个状态变量将会被检查点操作保存下来,并且可以通过使用ListCheckpointed接口来恢复状态变量。

  看了上面的例子,我们可能会有疑问,那就是为什么操作符状态是状态对象的列表。这是因为列表数据结构支持包含操作符状态的函数的并行度改变的操作。为了增加或者减少包含了操作符状态的函数的并行度,操作符状态需要被重新分区到更多或者更少的并行任务实例中去。而这样的操作需要合并或者分割状态对象。而对于每一个有状态的函数,分割和合并状态对象都是很常见的操作,所以这显然不是任何类型的状态都能自动完成的。

  通过提供一个状态对象的列表,拥有操作符状态的函数可以使用snapshotState()方法和restoreState()方法来实现以上所说的逻辑。snapshotState()方法将操作符状态分割成多个部分,restoreState()方法从所有的部分中将状态对象收集起来。当函数的操作符状态恢复时,状态变量将被分区到函数的所有不同的并行实例中去,并作为参数传递给restoreState()方法。如果并行任务的数量大于状态对象的数量,那么一些并行任务在开始的时候是没有状态的,所以restoreState()函数的参数为空列表。

  再来看一下上面的程序,我们可以看到操作符的每一个并行实例都暴露了一个状态对象的列表。如果我们增加操作符的并行度,那么一些并行任务将会从0开始计数。为了获得更好的状态分区的行为,当HighTempCounter函数扩容时,我们可以按照下面的程序来实现snapshotState()方法,这样就可以把计数值分配到不同的并行计数中去了。

override def snapshotState(
    chkpntId: Long,
    ts: Long): java.util.List[java.lang.Long] = {
  // split count into ten partial counts
  val div = highTempCnt / 10
  val mod = (highTempCnt % 10).toInt
  // return count as ten parts
  (List.fill(mod)(new java.lang.Long(div + 1)) ++
    List.fill(10 - mod)(new java.lang.Long(div))).asJava
}

 

02-14 04:53