我最近一直在处理一个使我发疯的问题,因为它一旦部署在Dataflow中就发生了,但是从来没有出现在一切正常的地方。仅供参考,我正在使用Apache Beam 2.9.0
。
我正在定义一个DoFn步骤,该步骤将事件缓冲一段特定的时间(例如5分钟),然后在该时间之后触发一些逻辑。
@StateId("bufferSize")
private final StateSpec<ValueState<Integer>> bufferSizeSpec =
StateSpecs.value(VarIntCoder.of());
@StateId("eventsBuffer")
private final StateSpec<BagState<String>> eventsBufferSpec =
StateSpecs.bag(StringUtf8Coder.of());
@TimerId("trigger")
private final TimerSpec triggerSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
我有
processElement
逻辑来添加传入事件...@ProcessElement
public void processElement(
ProcessContext processContext,
@StateId("bufferSize") ValueState<Integer> bufferSize,
@StateId("eventsBuffer") BagState<String> eventsBuffer,
@TimerId("trigger") Timer triggerTimer) {
triggerTimer.offset(Duration.standardMinutes(1)).setRelative();
int size = ObjectUtils.firstNonNull(bufferSize.read(), 0);
eventsBuffer.add(processContext.element().getValue());
bufferSize.write(++size);
}
然后我的扳机...
@OnTimer("trigger")
public void onExpiry(
@StateId("bufferSize") ValueState<Integer> bufferSize,
@StateId("eventsBuffer") BagState<String> eventsBuffer) throws Exception {
doSomethingHere();
}
每当执行
onExpiry
时,其接收的参数为null和0。集群方面会发生什么?
编辑:
在DoFn之前使用的窗口。
.apply(
"1min Window",
Window
.<KV<String, String>>into(
FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
最佳答案
重要的是要注意,当窗口过期时,将保留键窗口元组的状态,该状态将变为GC。
因此,对于键1,您的Bag对象将具有{key-1,TimeInterval-1},{key-1,TimeInterval-2}等数据。
如果要在输入值和计时器之间使用强语义,则可能需要探索EventTime计时器的用法。