问题描述
一个自定义处理器,它在 process()
中的简单 java.util.List
中缓冲事件-此缓冲区不是状态存储.
A custom processor which buffers events in a simple java.util.List
in process()
- this buffer is not a state store.
每隔30秒WALL_CLOCK_TIME, punctuate()
对该列表进行排序并刷新到接收器.假定仅单个分区源和接收器.需要EOS处理保证.
Every 30 seconds WALL_CLOCK_TIME, punctuate()
sorts this list and flushes to the sink. Assume only single partition source and sink. EOS processing guarantee is required.
我知道在任何给定时间,要么 process()
被执行,要么 punctuate()
被执行.
I know that at any given time either process()
gets executed or punctuate()
gets executed.
我担心changelog主题不支持此缓冲区.理想情况下,我认为这应该是支持EOS的状态存储.
I am concerned about this buffer not being backed by changelog topic. Ideally I believe this should have been a state store to support EOS.
但是有一个论点是将 commit.interval
设置为30秒以上(即40秒),可以确保缓冲区中的事件永远不会丢失.而且由于我们使用的是 WALL_CLOCK_TIME
,所以无论是否有事件发生,总是每30秒调用一次 punctuate()
.
But there is an argument that setting commit.interval
to more than 30 seconds - i.e. say 40 seconds, will make sure that the events in the buffer would never be lost. And also since we are using WALL_CLOCK_TIME
, the punctuate()
will always be called every 30 seconds regardless of whether we have events are not.
这是一个有效的论点吗?在什么情况下会使缓冲区中的事件永远丢失?
Is this a valid argument? What are the cases here that will make the events in the buffer lost forever?
@Override
public void init(ProcessorContext processorContext) {
super.init(processorContext);
this.buffer = new ArrayList<>();
context().schedule(Duration.ofSeconds(20L), PunctuationType.WALL_CLOCK_TIME, this::flush);
}
void flush(long timestamp){
LOG.info("Punctuator invoked.....");
buffer.stream().sorted(Comparator.comparing(o -> o.getId())).forEach(
i -> context().forward(i.getId(), i)
);
}
@Override
public void process(String key, Customer value) {
LOG.info("Processing {}", key);
buffer.add(value);
}
推荐答案
我想出了一些反对调整提交和标点时间间隔并称其为万无一失"的论点.
I sort of figured few arguments against tuning commit and punctuate interval and calling this setup foolproof.
从文档开始,在WALL_CLOCK_TIME
From docs, on WALL_CLOCK_TIME
有可能遗漏"标点符号:PunctuationType#WALL_CLOCK_TIME,在GC暂停时,间隔太短
It's possible to "miss" a punctuation if: withPunctuationType#WALL_CLOCK_TIME, on GC pause, too short interval
理想:
它:| ------------ 30s ------------ || ------------ 30s -----------| ------------ 30s ---
c o m m it :|------------30s------------|------------30s-----------|------------30s---
说 process()
花费了太多时间(例如18秒),因此在40秒时第二次运行未调用 punctuate()
-因为如文档所述,间隔太短.
Say process()
took too much time (say 18 seconds) so punctuate()
was not invoked for the second run at 40th second - because as doc mentioned, too short interval.
现在,在第31秒,如果应用程序崩溃,即使启用了eos,缓冲区中的事件也将在源端提交.重新启动时,缓冲区将丢失.
Now at 31st second, if the application crashes, even with eos enabled, events in buffer would have been committed at source. At restart, the buffer would be lost.
可以:| ------------ 30s ------------ || ------------ 30s ------------- | ------------ 30s ---
c o m m it :|------------30s------------|------------30s-------------|------------30s---
因此,调整提交和标点间隔可以抑制对状态存储的需求是无效的论据.
Hence it is not valid argument that tuning commit and punctuate interval would curb the need for state store.
这篇关于我可以依靠Kafka流中的内存中Java集合来通过微调标点和提交间隔来缓冲事件吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!