本文介绍了Kafka侦听器并发-如何处理从6个线程激发的停止/空闲事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我每天/每周都有两个卡夫卡听众。Daily的AutoStartup=TRUE并且每周的AutoStartup=False。我有一个终结点,可以停止正在运行的Daily并启动Weekly。一旦Weekly消费完消息,我就等待IDLE事件(设置为1分钟)触发,然后停止Weekly。现在我正在收听《我每天开始的周刊》上的停止活动。现在的问题是我的并发设置为6。所以我得到了6个空闲事件和6个停止事件。我做过如下处理。我想知道这是一种好做法,还是有更好的做法?

我将所有的Daily Stop事件收集到一个ConcurentHashMap中。一旦达到并发计数,这意味着所有6个每日侦听程序线程都将停止,并可以启动每周侦听程序。

private void processDailyStopEvent(ConsumerStoppedEvent event)
    {
        LOGGER.info("Processing DAILY Stop events");
        KafkaMessageListenerContainer source = (KafkaMessageListenerContainer) event.getSource();
        ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer) event.getContainer(ConcurrentMessageListenerContainer.class);

        eventMap.get("dailyStop").add(source.getListenerId());
        LOGGER.info("Added ListenerId {} to Map<dailyStop>", source.getListenerId());

        if (eventMap.get("dailyStop").size() == container.getConcurrency()) {
            LOGGER.info("All DAILY Stop events are captured. Clearing the Map<dailyStop>");
            eventMap.get("dailyStop").clear();

            LOGGER.info("Starting WEEKLY Consumer now.");
            kafkaService.startWeeklyConsumer();
        }
    }


推荐答案

您拥有的是合理的方法。

或者,您可以只等待并发容器的停止事件(其中源等于容器)-它在所有子容器停止后发布。

这篇关于Kafka侦听器并发-如何处理从6个线程激发的停止/空闲事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-30 14:10