本文介绍了如何将消息从单个生产者发布到两个主题并在 Kafka 中的单个侦听器中消费的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试从单个生产者发布两个不同主题的消息.
I am Trying to publish messages two different topics from single producer.
这里我创建了两个主题:
@Bean
public NewTopic multi1() {
return TopicBuilder.name("multi1").partitions(1).build();
}
@Bean
public NewTopic multi2() {
return TopicBuilder.name("multi2").partitions(1).build();
}
这是我向两个主题发送消息的方式:
this is how iam sending the messages to two topics:
public void sendingtomultitopic()
{
IntStream.range(0, 100).forEach(i->this.template.send("multi1", "mutli1 data value->"+i));
IntStream.range(0, 100).forEach(i->this.template.send("multi2", "multi2 data value->"+i));
logger.info("sending finished");
}
以上方法是发送到多个主题的正确方法吗?
我在下面尝试使用消息
@KafkaListener(topics = {"multi1,multi2"}, groupId = "diffgroupid3")
public void consumingfrommultitopics(String data) {
logger.info(String.format("consumingfromtwotopics -> %s", data));
}
收到异常:
org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [multi1,multi2]
我能够在这两个主题中发布数据.
iam able to the published data in those two topics.
但是这个消费者没有检索任何消息,请在这里帮助我?
推荐答案
您在 'topics' 表单中有一个拼写错误,应该是
You have a typo in 'topics' form, should be
@KafkaListener(topics = {"multi1","multi2"}...
(查看 可以使用单个 Spring 的 KafkaConsumer 侦听器听多个主题?)
你也可以有两个独立的@KafkaListener,每个主题对应一个.什么最适合您的用例.
You can also have two separate @KafkaListener, each for every topic.What works best for your use-case.
这篇关于如何将消息从单个生产者发布到两个主题并在 Kafka 中的单个侦听器中消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!