我有事件(ProductOrderRequested,ProductColorChanged,ProductDelivered ...),我想建立我产品的黄金记录。
但是我的目标是逐步建立黄金记录:每次活动都会为我提供产品的更新状态,并且我需要存储每个状态版本以实现可追溯性
我有一个非常简单的管道(代码胜于单词):
events
.apply("SessionWindow", Window.
<KV<String, Event>>into(Sessions.withGapDuration(gapSession)
.triggering(<early and late data trigger>))
.apply("GroupByKey", GroupByKey.create())
.apply("ComputeState", ParDo.of(new StatefulFn()))
我的问题是对于给定的窗口,我必须根据以下内容计算新状态:
先前状态(即先前窗口的计算状态)
收到的事件
我想避免调用外部服务来获取先前的状态,而是获取先前窗口的状态。有可能吗?
最佳答案
在Apache Beam中,状态始终在每个窗口范围内(另请参见此answer)。因此,我只能考虑重新进入全局窗口并在其中处理状态。在此全局StatefulFn
中,您可以存储和处理先前的状态。
然后看起来像这样:
events
.apply("SessionWindow", Window.
<KV<String, Event>>into(Sessions.withGapDuration(gapSession)
.triggering(<early and late data trigger>))
.apply("GroupByKey", GroupByKey.create())
.apply("Re-window into Global Window", Window.
<KV<String, Event>>into(new GlobalWindows())
.triggering(<early and late data trigger>))
.apply("ComputeState", ParDo.of(new StatefulFn()))
另请注意,到目前为止,Apache Beam不支持合并窗口的有状态处理(请参见此issue)。因此,由于状态未合并,因此当触发器触发会话窗口的早期或晚期结果时,基于会话窗口的
StatefulFn
将无法正常工作。这是使用非合并窗口(如全局窗口)的另一个原因。