提供请求-答复支持的ReplyingKafkaTemplate文档(在Spring-Kafka 2.1.3中引入)建议对请求和答复可以使用不同的类型:

ReplyingKafkaTemplate<K, V, R>


其中,参数化类型K表示消息密钥,V表示值(即请求),R表示答复。

到目前为止很好。但是用于实现服务器端Request-Reply的相应支持类似乎并不支持V,R的不同类型。该文档建议使用带有添加的@SendTo批注的KafkaListener,该批注在幕后在MessageListenerContainer上使用配置的ReplyTemplate 。但是AbstractKafkaListenerEndpoint仅支持侦听器和ReplyTemplate的单一类型:

public abstract class AbstractKafkaListenerEndpoint<K, V>
        implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean {

    ...

    /**
     * Set the {@link KafkaTemplate} to use to send replies.
     * @param replyTemplate the template.
     * @since 2.0
     */
    public void setReplyTemplate(KafkaTemplate<K, V> replyTemplate) {
        this.replyTemplate = replyTemplate;
    }

    ...

}


因此,V和R必须为同一类型。

文档中使用的示例确实将String用于Request和Reply。

我是否丢失了某些东西,或者这是应报告和更正的Spring-Kafka请求-答复支持中的设计缺陷?

最佳答案

这是fixed in the 2.2 release

对于较早的版本,只需注入原始的KafkaTemplate(没有泛型)。

编辑

@SpringBootApplication
public class So53151961Application {

    public static void main(String[] args) {
        SpringApplication.run(So53151961Application.class, args);
    }

    @KafkaListener(id = "so53151961", topics = "so53151961")
    @SendTo
    public Bar handle(Foo foo) {
        System.out.println(foo);
        return new Bar(foo.getValue().toUpperCase());
    }

    @Bean
    public ReplyingKafkaTemplate<String, Foo, Bar> replyingTemplate(ProducerFactory<String, Foo> pf,
            ConcurrentKafkaListenerContainerFactory<String, Bar> factory) {

        ConcurrentMessageListenerContainer<String, Bar> replyContainer =
                factory.createContainer("so53151961-replyTopic");
        replyContainer.getContainerProperties().setGroupId("so53151961.reply");
        ReplyingKafkaTemplate<String, Foo, Bar> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(pf, replyContainer);
        return replyingKafkaTemplate;
    }

    @Bean
    public KafkaTemplate<String, Bar> replyTemplate(ProducerFactory<String, Bar> pf,
            ConcurrentKafkaListenerContainerFactory<String, Bar> factory) {

        KafkaTemplate<String, Bar> kafkaTemplate = new KafkaTemplate<>(pf);
        factory.setReplyTemplate(kafkaTemplate);
        return kafkaTemplate;
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, Foo, Bar> template) {
        return args -> {
            ProducerRecord<String, Foo> record = new ProducerRecord<>("so53151961", null, "key", new Foo("foo"));
            RequestReplyFuture<String, Foo, Bar> future = template.sendAndReceive(record);
            System.out.println(future.get(10, TimeUnit.SECONDS).value());
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so53151961", 1, (short) 1);
    }

    @Bean
    public NewTopic reply() {
        return new NewTopic("so53151961-replyTopic", 1, (short) 1);
    }

    public static class Foo {

        public String value;

        public Foo() {
            super();
        }

        public Foo(String value) {
            this.value = value;
        }

        public String getValue() {
            return this.value;
        }

        public void setValue(String value) {
            this.value = value;
        }

        @Override
        public String toString() {
            return "Foo [value=" + this.value + "]";
        }

    }

    public static class Bar {

        public String value;

        public Bar() {
            super();
        }

        public Bar(String value) {
            this.value = value;
        }

        public String getValue() {
            return this.value;
        }

        public void setValue(String value) {
            this.value = value;
        }

        @Override
        public String toString() {
            return "Bar [value=" + this.value + "]";
        }

    }

}


spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example


结果

Foo [value=foo]
Bar [value=FOO]

关于apache-kafka - spring-kafka请求答复:请求和答复的不同类型,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53151961/

10-10 18:36