问题描述
我仅使用StreamsBuilder的API来构建GlobalKTable,如下所示:
I am using simply API of StreamsBuilder for building a GlobalKTable as this:
Materialized<Long, Category, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.<Long, Category, KeyValueStore<Bytes, byte[]>>as(this.categoryStoreName)
.withCachingDisabled()
.withKeySerde(Serdes.Long())
.withValueSerde(CATEGORY_JSON_SERDE);
return streamsBuilder.globalTable(categoryTopic, materialized);
我希望收到更改的通知.它很少更新,以防万一我想触发缓存失效.卡夫卡这样做的方法是什么?
I would like to be notified by changes of it. It is rarely updated and in case an update I would like to trigger cache invalidation. What is the Kafka way of doing this?
推荐答案
GlobalKTable不支持此功能.但是,您可以使用全局存储"并实现每次更新都会调用的自定义 Processor
.
GlobalKTable does not support this. However, you can use a "global store" and implement your custom Processor
that will be called for each update.
在内部, GlobalKTable
使用全局存储"并为您提供 Processor
实现.
Internally, a GlobalKTable
uses a "global store" and provides the Processor
implementation for you.
您可以通过 StreamsBuilder#addGlobalStore()
添加全局存储.
You can add a global store via StreamsBuilder#addGlobalStore()
.
这篇关于如何通知有关GlobalKTable状态存储的更新?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!