

一个自定义处理器,它在 process()中的简单 java.util.List 中缓冲事件-此缓冲区不是状态存储.

A custom processor which buffers events in a simple java.util.List in process() - this buffer is not a state store.

每隔30秒WALL_CLOCK_TIME, punctuate()对该列表进行排序并刷新到接收器.假定仅单个分区源和接收器.需要EOS处理保证.

Every 30 seconds WALL_CLOCK_TIME, punctuate() sorts this list and flushes to the sink. Assume only single partition source and sink. EOS processing guarantee is required.

我知道在任何给定时间,要么 process()被执行,要么 punctuate()被执行.

I know that at any given time either process() gets executed or punctuate() gets executed.


I am concerned about this buffer not being backed by changelog topic. Ideally I believe this should have been a state store to support EOS.

但是有一个论点是将 commit.interval 设置为30秒以上(即40秒),可以确保缓冲区中的事件永远不会丢失.而且由于我们使用的是 WALL_CLOCK_TIME ,所以无论是否有事件发生,总是每30秒调用一次 punctuate().

But there is an argument that setting commit.interval to more than 30 seconds - i.e. say 40 seconds, will make sure that the events in the buffer would never be lost. And also since we are using WALL_CLOCK_TIME, the punctuate() will always be called every 30 seconds regardless of whether we have events are not.


Is this a valid argument? What are the cases here that will make the events in the buffer lost forever?

public void init(ProcessorContext processorContext) {
    this.buffer = new ArrayList<>();
    context().schedule(Duration.ofSeconds(20L), PunctuationType.WALL_CLOCK_TIME, this::flush);

void flush(long timestamp){
    LOG.info("Punctuator invoked.....");
    buffer.stream().sorted(Comparator.comparing(o -> o.getId())).forEach(
            i -> context().forward(i.getId(), i)

public void process(String key, Customer value) {
    LOG.info("Processing {}", key);



I sort of figured few arguments against tuning commit and punctuate interval and calling this setup foolproof.


From docs, on WALL_CLOCK_TIME


It's possible to "miss" a punctuation if: withPunctuationType#WALL_CLOCK_TIME, on GC pause, too short interval


它:| ------------ 30s ------------ || ------------ 30s -----------| ------------ 30s ---

c o m m it :|------------30s------------|------------30s-----------|------------30s---

process()花费了太多时间(例如18秒),因此在40秒时第二次运行未调用 punctuate()-因为如文档所述,间隔太短.

Say process() took too much time (say 18 seconds) so punctuate() was not invoked for the second run at 40th second - because as doc mentioned, too short interval.


Now at 31st second, if the application crashes, even with eos enabled, events in buffer would have been committed at source. At restart, the buffer would be lost.

可以:| ------------ 30s ------------ || ------------ 30s ------------- | ------------ 30s ---

c o m m it :|------------30s------------|------------30s-------------|------------30s---


Hence it is not valid argument that tuning commit and punctuate interval would curb the need for state store.


10-22 15:24