问题描述
我使用 StreamsBuilder 的简单 API 来构建 GlobalKTable,如下所示:
I am using simply API of StreamsBuilder for building a GlobalKTable as this:
Materialized<Long, Category, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.<Long, Category, KeyValueStore<Bytes, byte[]>>as(this.categoryStoreName)
.withCachingDisabled()
.withKeySerde(Serdes.Long())
.withValueSerde(CATEGORY_JSON_SERDE);
return streamsBuilder.globalTable(categoryTopic, materialized);
我希望收到更改通知.它很少更新,如果更新我想触发缓存失效.卡夫卡的做法是什么?
I would like to be notified by changes of it. It is rarely updated and in case an update I would like to trigger cache invalidation. What is the Kafka way of doing this?
推荐答案
GlobalKTable 不支持这个.但是,您可以使用全局存储"并实现您的自定义 Processor
,该Processor
将在每次更新时调用.
GlobalKTable does not support this. However, you can use a "global store" and implement your custom Processor
that will be called for each update.
在内部,GlobalKTable
使用全局存储"并为您提供 Processor
实现.
Internally, a GlobalKTable
uses a "global store" and provides the Processor
implementation for you.
您可以通过 StreamsBuilder#addGlobalStore()
添加全局存储.
You can add a global store via StreamsBuilder#addGlobalStore()
.
这篇关于如何收到有关 GlobalKTable 状态存储更新的通知?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!