本文介绍了我可以依靠 Kafka 流中的内存 Java 集合来通过微调标点和提交间隔来缓冲事件吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 一个自定义处理器,它在 process() 中的一个简单的 java.util.List 中缓冲事件 - 此缓冲区不是状态存储.每 30 秒 WALL_CLOCK_TIME,punctuate() 对该列表进行排序并刷新到接收器.假设只有单个分区源和接收器.需要EOS处理保证.我知道在任何给定的时间要么 process() 被执行,要么 punctuate() 被执行.我担心这个缓冲区不受变更日志主题的支持.理想情况下,我认为这应该是支持 EOS 的国家商店.但是有一个论点是将 commit.interval 设置为超过 30 秒 - 即 40 秒,将确保缓冲区中的事件永远不会丢失.而且由于我们使用的是 WALL_CLOCK_TIME,punctuate() 将始终每 30 秒调用一次,无论我们是否有事件.这是一个有效的论点吗?这里有哪些情况会导致缓冲区中的事件永远丢失?@Override公共无效初始化(处理器上下文处理器上下文){super.init(processorContext);this.buffer = new ArrayList();context().schedule(Duration.ofSeconds(20L), PunctuationType.WALL_CLOCK_TIME, this::flush);}无效刷新(长时间戳){LOG.info(标点符号调用.....");buffer.stream().sorted(Comparator.comparing(o -> o.getId())).forEach(我 ->context().forward(i.getId(), i));}@覆盖公共无效流程(字符串键,客户值){LOG.info("Processing {}", key);缓冲区.添加(值);} 解决方案 我有点想反对调整提交和标点间隔并称此设置万无一失.来自文档,WALL_CLOCK_TIME这只是尽力而为,因为它的粒度受处理循环的迭代需要完成有可能错过"标点符号如果:与PunctuationType#WALL_CLOCK_TIME,GC 暂停,间隔太短理想:标点符号:|-------20s-------|-------20s-------|------20s-------|------20s------|来吧:|------------30s------------|------------30s----------|------------30s---Say process() 花费了太多时间(比如 18 秒),所以 punctuate() 没有在第 40 秒的第二次运行中被调用 - 因为正如文档所提到的,间隔太短.现在在第 31 秒,如果应用程序崩溃,即使启用了 eos,缓冲区中的事件也会在源代码处提交.重新启动时,缓冲区将丢失.标点符号:|-------20s-------|------process()---------20s-------|------20s------|来吧:|------------30s------------|------------30s------------|------------30s---因此,调整提交和标点间隔将抑制对状态存储的需求是无效的论点.A custom processor which buffers events in a simple java.util.List in process() - this buffer is not a state store.Every 30 seconds WALL_CLOCK_TIME, punctuate() sorts this list and flushes to the sink. Assume only single partition source and sink. EOS processing guarantee is required.I know that at any given time either process() gets executed or punctuate() gets executed.I am concerned about this buffer not being backed by changelog topic. Ideally I believe this should have been a state store to support EOS.But there is an argument that setting commit.interval to more than 30 seconds - i.e. say 40 seconds, will make sure that the events in the buffer would never be lost. And also since we are using WALL_CLOCK_TIME, the punctuate() will always be called every 30 seconds regardless of whether we have events are not.Is this a valid argument? What are the cases here that will make the events in the buffer lost forever?@Overridepublic void init(ProcessorContext processorContext) { super.init(processorContext); this.buffer = new ArrayList<>(); context().schedule(Duration.ofSeconds(20L), PunctuationType.WALL_CLOCK_TIME, this::flush);}void flush(long timestamp){ LOG.info("Punctuator invoked....."); buffer.stream().sorted(Comparator.comparing(o -> o.getId())).forEach( i -> context().forward(i.getId(), i) );}@Overridepublic void process(String key, Customer value) { LOG.info("Processing {}", key); buffer.add(value);} 解决方案 I sort of figured few arguments against tuning commit and punctuate interval and calling this setup foolproof.From docs, on WALL_CLOCK_TIMEThis is best effort only as its granularity is limited by how long aniteration of the processing loop takes to completeIt's possible to "miss" a punctuation if: withPunctuationType#WALL_CLOCK_TIME, on GC pause, too short intervalIdeal :punctuate :|-------20s-------|-------20s-------|------20s-------|------20s------|c o m m it :|------------30s------------|------------30s-----------|------------30s---Say process() took too much time (say 18 seconds) so punctuate() was not invoked for the second run at 40th second - because as doc mentioned, too short interval.Now at 31st second, if the application crashes, even with eos enabled, events in buffer would have been committed at source. At restart, the buffer would be lost.punctuate :|-------20s-------|------process()---------20s-------|------20s------|c o m m it :|------------30s------------|------------30s-------------|------------30s---Hence it is not valid argument that tuning commit and punctuate interval would curb the need for state store. 这篇关于我可以依靠 Kafka 流中的内存 Java 集合来通过微调标点和提交间隔来缓冲事件吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!
10-22 15:24