本文介绍了Kafka侦听器并发-如何处理从6个线程激发的停止/空闲事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我每天/每周都有两个卡夫卡听众。Daily的AutoStartup=TRUE并且每周的AutoStartup=False。我有一个终结点,可以停止正在运行的Daily并启动Weekly。一旦Weekly消费完消息,我就等待IDLE事件(设置为1分钟)触发,然后停止Weekly。现在我正在收听《我每天开始的周刊》上的停止活动。现在的问题是我的并发设置为6。所以我得到了6个空闲事件和6个停止事件。我做过如下处理。我想知道这是一种好做法,还是有更好的做法?
我将所有的Daily Stop事件收集到一个ConcurentHashMap中。一旦达到并发计数,这意味着所有6个每日侦听程序线程都将停止,并可以启动每周侦听程序。
private void processDailyStopEvent(ConsumerStoppedEvent event)
{
LOGGER.info("Processing DAILY Stop events");
KafkaMessageListenerContainer source = (KafkaMessageListenerContainer) event.getSource();
ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer) event.getContainer(ConcurrentMessageListenerContainer.class);
eventMap.get("dailyStop").add(source.getListenerId());
LOGGER.info("Added ListenerId {} to Map<dailyStop>", source.getListenerId());
if (eventMap.get("dailyStop").size() == container.getConcurrency()) {
LOGGER.info("All DAILY Stop events are captured. Clearing the Map<dailyStop>");
eventMap.get("dailyStop").clear();
LOGGER.info("Starting WEEKLY Consumer now.");
kafkaService.startWeeklyConsumer();
}
}
推荐答案
您拥有的是合理的方法。
或者,您可以只等待并发容器的停止事件(其中源等于容器)-它在所有子容器停止后发布。
这篇关于Kafka侦听器并发-如何处理从6个线程激发的停止/空闲事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!