我只是从使用Scala的Apache Flink开始。有人可以告诉我如何从我当前的数据流中创建滞后流(被k个事件或k个时间单位滞后)吗?

基本上,我想在数据流上实现自动回归模型(在具有时间滞后版本的流上进行线性回归)。因此,需要一种类似于以下伪代码的方法。

val ds : DataStream = ...

val laggedDS : DataStream = ds.map(lag _)

def lag(ds : DataStream, k : Time) : DataStream = {

}


如果每个事件间隔为1秒,并且有2秒的滞后,我希望这样的示例输入和输出。

输入:1,2,3,4,5,6,7 ...
输出:NA,NA,1、2、3、4、5 ...

最佳答案

鉴于您的要求正确,我将其实现为带有FIFO队列的FlatMapFunction。队列缓冲k事件,并在新事件到达时发出头部。如果需要容错流应用程序,则必须将队列注册为状态。然后Flink将负责检查点状态(即队列)并在发生故障时将其恢复。

FlatMapFunction可能如下所示:

class Lagger(val k: Int)
    extends FlatMapFunction[X, X]
    with Checkpointed[mutable.Queue[X]]
{

  var fifo: mutable.Queue[X] = new mutable.Queue[X]()

  override def flatMap(value: X, out: Collector[X]): Unit = {
    // add new element to queue
    fifo.enqueue(value)
    if (fifo.size == k + 1) {
      // remove head element and emit
      out.collect(fifo.dequeue())
    }
  }

  // restore state
  override def restoreState(state: mutable.Queue[X]) = { fifo = state }

  // get state to checkpoint
  override def snapshotState(cId: Long, cTS: Long): mutable.Queue[X] = fifo

}


返回包含时间滞后的元素会更加复杂。这将需要计时器线程进行发射,因为仅当新元素到达时才调用该函数。

关于scala - Apache Flink:创建滞后的数据流,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/39199358/

10-13 04:35