我正在从数据流管道中的PubSub主题读取记录。 PubSub记录分为固定窗口,然后在每个窗口上分组。每个窗口都按序列号排序,因为我们需要使用beam.SortValues依次处理这些记录。然后,我将记录写入Cloud BigTable
管道的问题是数据新鲜度和系统延迟。数据新鲜度似乎停滞不前,水印停止前进。
我正在使用以下窗口化策略在GroupByKey步骤之后发出记录:
PCollection<KV<BigInteger, JSONObject>> window = pubsubRecords.apply("Raw to String", ParDo.of(new LogsFn()))
.apply("Window", Window
.<KV<BigInteger, JSONObject>>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(500),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1)))))
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
);
我认为问题可能出在窗口策略上。基本上,我想执行以下操作:从PubSub中将记录读入1分钟的FixedWindows中,对窗口进行排序并写入BigTable。如果我使用默认触发器,则GroupByKey步骤不会发出任何结果。有人可以帮我弄这个吗?
最佳答案
读取您的代码,现在看来您的早期触发器和窗口大小向后。您的窗口化策略实际上是:
10秒活动时间固定窗口
1分钟的处理时间或窗格中的500个元素的复合早期触发器。
晚事件被丢弃。
如果您只需要1分钟的事件时间窗口,则需要满足以下条件:
PCollection<KV<BigInteger, JSONObject>> window = pubsubRecords.apply("Raw to String", ParDo.of(new LogsFn()))
.apply("Window", Window
.<KV<BigInteger, JSONObject>>into(FixedWindows.of(Duration.standardMinutes(1)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
.withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS));
默认情况下,始终将Fire设为OnTimeBehavior,但为了使其可读性,我们可以使其明确。如果您需要复合触发器,则可以重新添加-我怀疑您想触发1个10秒或500个元素。