我是Kafka Streams的新手,正在使用1.0.0版。我想从其中一个值为KTable设置新的键。

当使用KStream时,可以通过使用像这样的selectKey()方法来完成。

kstream.selectKey ((k,v) -> v.newKey)

但是,KTable中缺少这种方法。唯一的方法是将给定的KTable转换为KStream。对这个问题有什么想法吗?它改变了反对KTable设计的关键吗?

最佳答案

如果要设置新键,则需要对KTable重新分组:

KTable newTable = table.groupBy(/*put select key function here*/)
                       .aggregate(...);

因为键对于KTable必须是唯一的(与KStream相反),所以需要指定一个聚合函数,该函数将具有相同(新)键的所有记录聚合到一个值中。

09-27 14:54