Kafka出站通道适配器发送消息

Kafka出站通道适配器发送消息

本文介绍了Spring-Integration-Kafka出站通道适配器发送消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用Spring-Integration-Kafka,通过出站通道适配器,我尝试将消息发送到名称为"测试"的主题

Using Spring-Integration-Kafka, With outbound-channel-adapter I am trying to send messages to a topic with name "test"

通过命令行终端,我启动了zookeeper,kafka,并创建了名称为"test"

Through command line terminal, I started zookeeper, kafka and created topic with name "test"

Spring XML配置

<int:publish-subscribe-channel id="inputToKafka" />

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    kafka-template="template"
                                    sync="true"
                                    topic="test">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

JUnit测试代码

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
        "classpath:kafka-outbound-context.xml"
        })
public class ProducerTest{

    @Autowired
    @Qualifier("inputToKafka")
    MessageChannel channel;

    @Test
    public void test_send_message() {

        channel.send(MessageBuilder.withPayload("Test Message")
                .setHeader(KafkaHeaders.TOPIC, "test").build());

    }

}

测试用例成功,并且在调试时我发现channel.send()返回true

The test case succeeds and on debug i find channel.send() returns true

我通过带有以下命令的命令行检查主题,但是在测试主题中没有看到任何消息.

I inspect the topic through command line with below command, but I don't see any message in the test topic.

有人可以为什么我在 test 主题上看不到任何消息吗?

Can somebody why I don't see any messages on my test topic ?

推荐答案

您是否查看过日志?您需要配置键和值序列化器,否则您将获得

Have you looked in the logs? You need to configure key and value serializers, otherwise you'll get

Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.

使用Java时:

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

地图键是 key.serializer value.serializer .

这篇关于Spring-Integration-Kafka出站通道适配器发送消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-28 07:33