我有以下代码

//Kafka Config setup
Properties props = ...; //setup

List<String> topicList = Arrays.asList({"A", "B", "C"});

StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream(topicList);

source
.map((k,v) -> { //busy code for mapping data})
.transformValues(new MyGenericTransformer());
.to((k,v,r) -> {//busy code for topic routing});

new KafkaStream(builder.build(), properties).start();


我的问题:当我添加多个要订阅的主题(即上面的A,B,C)时,Kstream代码停止接收记录。

参考文献:https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsBuilder.html

相关文件

public <K,V> KStream<K,V> stream(java.util.Collection<java.lang.String> topics)

"If multiple topics are specified there is no ordering guarantee for records from different topics."


我想要实现的目标:让一个Kstream(即上面的“源”)使用/处理多个主题。

最佳答案

主题共享相同的密钥吗?


  请注意,必须按键对指定的输入主题进行分区。如果
  不是这种情况,用户有责任重新分区
  任何基于键的操作(例如聚合或联接)之前的数据为
  应用于返回的KStream。


这可能是您的阻止者。

另一个可能的问题可能是使用的消费群体。

08-28 16:02
查看更多