我在春季引导中有kafka处理程序:

    @KafkaListener(topics = "topic-one", groupId = "response")
    public void listen(String response) {
        myService.processResponse(response);
    }


例如,生产者每秒发送一条消息。但是myService.processResponse工作10秒。我需要处理每个消息并在新线程中启动myService.processResponse。我可以创建我的执行器并将每个响应委派给它。但是我认为kafka中还有其他配置。我发现2:

1)将concurrency = "5"添加到@KafkaListener注释-似乎可以正常工作。但是我不确定如何正确,因为我有第二种方法:

2)我可以创建ConcurrentKafkaListenerContainerFactory并将其设置为ConsumerFactoryconcurrency

我不明白这些方法之间的区别吗?仅将concurrency = "5"添加到@KafkaListener注释就足够了吗,还是我需要创建ConcurrentKafkaListenerContainerFactory

还是我什么都不懂,还有别的办法吗?

最佳答案

使用执行器会使管理偏移量变得复杂。不推荐。

使用@KafkaListener,框架为您创建一个ConcurrentKafkaListenerContainerFactory

注释上的concurrency只是一个方便;它会覆盖出厂设置。

这使您可以将同一工厂与多个侦听器一起使用,每个侦听器具有不同的并发性。

您可以使用引导属性设置容器并发性(默认)。该值被注释值覆盖;见javadocs ...

/**
 * Override the container factory's {@code concurrency} setting for this listener. May
 * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
 * which case {@link Number#intValue()} is used to obtain the value.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the concurrency.
 * @since 2.2
 */
String concurrency() default "";

关于java - 如何在不同线程中处理@KafkaListener方法?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/55241976/

10-13 09:11