我正在使用具有以下配置的spring-kafka:

package com.danigu.fancypants.infrastructure;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;

/**
 * @author dani
 */
@Data
@EnableKafka
@Configuration
@Import({KafkaConfigurationProperties.class})
public class KafkaConfiguration {
    @Inject KafkaConfigurationProperties kcp;

    protected Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kcp.getBrokerAddress());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kcp.getGroupId());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProperties());
    }

    @Bean
    public StringJsonMessageConverter stringJsonMessageConverter(ObjectMapper mapper) {
        return new StringJsonMessageConverter(mapper);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            StringJsonMessageConverter messageConverter) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();

        factory.setMessageConverter(messageConverter);
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(1);
        factory.setRetryTemplate(retryTemplate());

        return factory;
    }

    /*
     * Retry template.
     */

    protected RetryPolicy retryPolicy() {
        SimpleRetryPolicy policy = new SimpleRetryPolicy();
        policy.setMaxAttempts(3);
        return policy;
    }

    protected BackOffPolicy backOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(1000);
        return policy;
    }

    protected RetryTemplate retryTemplate() {
       RetryTemplate template = new RetryTemplate();

       template.setRetryPolicy(retryPolicy());
       template.setBackOffPolicy(backOffPolicy());

       return template;
    }
}

我的听众看起来像这样:
package com.danigu.fancypants.integration.inbound.dress;

import com.danigu.fancypants.integration.inbound.InvalidRequestException;
import com.danigu.fancypants.integration.inbound.dress.payload.DressRequest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import javax.inject.Inject;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
import java.util.Set;

/**
 * @author dani
 */
@Component
public class DressListener {

    @Inject protected Validator validator;

    @KafkaListener(topics = {"${kafka.dressesTopic}"})
    public void onMessage(@Payload DressRequest request, Acknowledgment acknowledgment) {
        assertValidRequest(request);

        System.out.println(request);

        acknowledgment.acknowledge();
    }

    protected void assertValidRequest(DressRequest request) {
        final Set<ConstraintViolation<DressRequest>> violations = validator.validate(request);

        if(!violations.isEmpty()) {
            throw new InvalidRequestException(violations, request);
        }
    }
}

到目前为止,我一直在查看spring-kafka的测试和参考文档,there,这些文档说应该配置适当类型的ErrorHandler,这个test表示我应该在ContainerProperties上进行配置,尽管那只是一个错误处理程序,在我的用例中,我想定义多个(针对不同的有效负载类型),如果可以的话,这可能吗?

另外,有没有办法描述在带注释的侦听器void上使用哪个错误处理程序?

另外,是否有一种方法可以针对每个RecoveryCallback或可能针对每个不同的主题来描述@KafkaListener,或者为此必须具有不同的ListenerContainerFactory

我可能会完全错了,有人可以向我指出正确的方向,我如何以正确的方式为不同的有效负载类型配置多个ErrorHandler

最佳答案

我不确定您所说的“不同的有效负载类型”是什么意思,因为您只有一个@KafkaListener。对于不同的有效负载类型,类级别的@KafkaListener可以在方法级别具有@KafkaHandler

无论如何,每个容器只有一个错误处理程序,因此您需要为每个错误处理程序使用不同的容器工厂(恢复回调也是如此)。

我们最近在errorHandler中的@RabbitListener上添加了spring-amqp ...

/**
 * Set an {@link RabbitListenerErrorHandler} to invoke if the listener method throws
 * an exception.
 * @return the error handler.
 * @since 2.0
 */
String errorHandler() default "";

...因此每个方法都可以在其中拥有自己的错误处理程序。

我们可能会在下一个spring-kafka版本中做类似的事情。但是对于每个@KafkaListener来说仍然只能是一个,因此对于类级别的@KafkaListener不会有帮助。

08-04 08:32