本文介绍了如何收到有关 GlobalKTable 状态存储更新的通知?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 StreamsBuilder 的简单 API 来构建 GlobalKTable,如下所示:

I am using simply API of StreamsBuilder for building a GlobalKTable as this:

Materialized<Long, Category, KeyValueStore<Bytes, byte[]>> materialized =
    Materialized.<Long, Category, KeyValueStore<Bytes, byte[]>>as(this.categoryStoreName)
        .withCachingDisabled()
        .withKeySerde(Serdes.Long())
        .withValueSerde(CATEGORY_JSON_SERDE);

return streamsBuilder.globalTable(categoryTopic, materialized);

我希望收到更改通知.它很少更新,如果更新我想触发缓存失效.卡夫卡的做法是什么?

I would like to be notified by changes of it. It is rarely updated and in case an update I would like to trigger cache invalidation. What is the Kafka way of doing this?

推荐答案

GlobalKTable 不支持这个.但是,您可以使用全局存储"并实现您的自定义 Processor,该Processor 将在每次更新时调用.

GlobalKTable does not support this. However, you can use a "global store" and implement your custom Processor that will be called for each update.

在内部,GlobalKTable 使用全局存储"并为您提供 Processor 实现.

Internally, a GlobalKTable uses a "global store" and provides the Processor implementation for you.

您可以通过 StreamsBuilder#addGlobalStore() 添加全局存储.

You can add a global store via StreamsBuilder#addGlobalStore().

这篇关于如何收到有关 GlobalKTable 状态存储更新的通知?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-26 23:20