从概念上讲
假设我有一个名为“地址”的主题,其中的键是一个人的名字(字符串),而值是一个人的地址(字符串)。此人的地址的更新将是一条消息,其中包含他们的姓名(作为关键字)和一个新的地址(作为值)。因此,如果我只想要任何一个键的最新值,我想我会创建一个ktable。当我这样做时,这里实际发生了什么? Kafka是否正在创建一个实际上是ktable并截断旧值的新主题?还是我必须为当前地址创建一个新主题?还是完全其他?
几乎
我发现的所有示例和教程都使用不推荐使用的方法,因此我希望有新的东西。我当前的解决方案是阅读这样的主题:
final Properties config = new Properties();
//leaving out all the config.put() for readability
final Consumer<String, String> consumer = new KafkaConsumer<>(config);
consumer.subscribe(Collections.singletonList("addresses"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
//do stuff
}
}
} finally {
consumer.close();
}
这工作了一段时间,但是现在我想要一个有状态的解决方案。有一种简单的方法可以在信息进入时将其粘贴到ktable中吗?我不想过滤它或任何东西,我只想要任何条目来更新状态。提前致谢。
最佳答案
KTable
不会创建新主题。相反,它将源主题上的消息像数据库中的ups一样对待-如果是第一次看到该密钥,就好像插入了一条新记录。如果不是第一次,它将被视为对现有记录的更新。 KTable
可用作Kafka Streams其他部分的输入,您也可以将该状态保存在本地存储中,并在应用程序的其他部分中查询K / V存储。