我有以下代码
//Kafka Config setup
Properties props = ...; //setup
List<String> topicList = Arrays.asList({"A", "B", "C"});
StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream(topicList);
source
.map((k,v) -> { //busy code for mapping data})
.transformValues(new MyGenericTransformer());
.to((k,v,r) -> {//busy code for topic routing});
new KafkaStream(builder.build(), properties).start();
我的问题:当我添加多个要订阅的主题(即上面的A,B,C)时,Kstream代码停止接收记录。
参考文献:https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsBuilder.html
相关文件
public <K,V> KStream<K,V> stream(java.util.Collection<java.lang.String> topics)
"If multiple topics are specified there is no ordering guarantee for records from different topics."
我想要实现的目标:让一个Kstream(即上面的“源”)使用/处理多个主题。
最佳答案
主题共享相同的密钥吗?
请注意,必须按键对指定的输入主题进行分区。如果
不是这种情况,用户有责任重新分区
任何基于键的操作(例如聚合或联接)之前的数据为
应用于返回的KStream。
这可能是您的阻止者。
另一个可能的问题可能是使用的消费群体。