本文介绍了我可以依靠Kafka流中的内存中Java集合来通过微调标点和提交间隔来缓冲事件吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

一个自定义处理器,它在 process()中的简单 java.util.List 中缓冲事件-此缓冲区不是状态存储.

A custom processor which buffers events in a simple java.util.List in process() - this buffer is not a state store.

每隔30秒WALL_CLOCK_TIME, punctuate()对该列表进行排序并刷新到接收器.假定仅单个分区源和接收器.需要EOS处理保证.

Every 30 seconds WALL_CLOCK_TIME, punctuate() sorts this list and flushes to the sink. Assume only single partition source and sink. EOS processing guarantee is required.

我知道在任何给定时间,要么 process()被执行,要么 punctuate()被执行.

I know that at any given time either process() gets executed or punctuate() gets executed.

我担心changelog主题不支持此缓冲区.理想情况下,我认为这应该是支持EOS的状态存储.

I am concerned about this buffer not being backed by changelog topic. Ideally I believe this should have been a state store to support EOS.

但是有一个论点是将 commit.interval 设置为30秒以上(即40秒),可以确保缓冲区中的事件永远不会丢失.而且由于我们使用的是 WALL_CLOCK_TIME ,所以无论是否有事件发生,总是每30秒调用一次 punctuate().

But there is an argument that setting commit.interval to more than 30 seconds - i.e. say 40 seconds, will make sure that the events in the buffer would never be lost. And also since we are using WALL_CLOCK_TIME, the punctuate() will always be called every 30 seconds regardless of whether we have events are not.

这是一个有效的论点吗?在什么情况下会使缓冲区中的事件永远丢失?

Is this a valid argument? What are the cases here that will make the events in the buffer lost forever?

@Override
public void init(ProcessorContext processorContext) {
    super.init(processorContext);
    this.buffer = new ArrayList<>();
    context().schedule(Duration.ofSeconds(20L), PunctuationType.WALL_CLOCK_TIME, this::flush);
}

void flush(long timestamp){
    LOG.info("Punctuator invoked.....");
    buffer.stream().sorted(Comparator.comparing(o -> o.getId())).forEach(
            i -> context().forward(i.getId(), i)
    );
}

@Override
public void process(String key, Customer value) {
    LOG.info("Processing {}", key);
    buffer.add(value);
}

推荐答案

我想出了一些反对调整提交和标点时间间隔并称其为万无一失"的论点.

I sort of figured few arguments against tuning commit and punctuate interval and calling this setup foolproof.

从文档开始,在WALL_CLOCK_TIME

From docs, on WALL_CLOCK_TIME

有可能遗漏"标点符号:PunctuationType#WALL_CLOCK_TIME,在GC暂停时,间隔太短

It's possible to "miss" a punctuation if: withPunctuationType#WALL_CLOCK_TIME, on GC pause, too short interval

理想:

它:| ------------ 30s ------------ || ------------ 30s -----------| ------------ 30s ---

c o m m it :|------------30s------------|------------30s-----------|------------30s---

process()花费了太多时间(例如18秒),因此在40秒时第二次运行未调用 punctuate()-因为如文档所述,间隔太短.

Say process() took too much time (say 18 seconds) so punctuate() was not invoked for the second run at 40th second - because as doc mentioned, too short interval.

现在,在第31秒,如果应用程序崩溃,即使启用了eos,缓冲区中的事件也将在源端提交.重新启动时,缓冲区将丢失.

Now at 31st second, if the application crashes, even with eos enabled, events in buffer would have been committed at source. At restart, the buffer would be lost.

可以:| ------------ 30s ------------ || ------------ 30s ------------- | ------------ 30s ---

c o m m it :|------------30s------------|------------30s-------------|------------30s---

因此,调整提交和标点间隔可以抑制对状态存储的需求是无效的论据.

Hence it is not valid argument that tuning commit and punctuate interval would curb the need for state store.

这篇关于我可以依靠Kafka流中的内存中Java集合来通过微调标点和提交间隔来缓冲事件吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-22 15:24