问题描述
使用Spring-Integration-Kafka,通过出站通道适配器,我尝试将消息发送到名称为"测试"的主题
Using Spring-Integration-Kafka, With outbound-channel-adapter I am trying to send messages to a topic with name "test"
通过命令行终端,我启动了zookeeper,kafka,并创建了名称为"test"
Through command line terminal, I started zookeeper, kafka and created topic with name "test"
Spring XML配置
<int:publish-subscribe-channel id="inputToKafka" />
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
auto-startup="false"
channel="inputToKafka"
kafka-template="template"
sync="true"
topic="test">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
JUnit测试代码
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
"classpath:kafka-outbound-context.xml"
})
public class ProducerTest{
@Autowired
@Qualifier("inputToKafka")
MessageChannel channel;
@Test
public void test_send_message() {
channel.send(MessageBuilder.withPayload("Test Message")
.setHeader(KafkaHeaders.TOPIC, "test").build());
}
}
测试用例成功,并且在调试时我发现channel.send()返回true
The test case succeeds and on debug i find channel.send() returns true
我通过带有以下命令的命令行检查主题,但是在测试主题中没有看到任何消息.
I inspect the topic through command line with below command, but I don't see any message in the test topic.
有人可以为什么我在 test 主题上看不到任何消息吗?
Can somebody why I don't see any messages on my test topic ?
推荐答案
您是否查看过日志?您需要配置键和值序列化器,否则您将获得
Have you looked in the logs? You need to configure key and value serializers, otherwise you'll get
Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.
使用Java时:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
地图键是 key.serializer
和 value.serializer
.
这篇关于Spring-Integration-Kafka出站通道适配器发送消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!