本文介绍了如何通知有关GlobalKTable状态存储的更新?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我仅使用StreamsBuilder的API来构建GlobalKTable,如下所示:

I am using simply API of StreamsBuilder for building a GlobalKTable as this:

Materialized<Long, Category, KeyValueStore<Bytes, byte[]>> materialized =
    Materialized.<Long, Category, KeyValueStore<Bytes, byte[]>>as(this.categoryStoreName)
        .withCachingDisabled()
        .withKeySerde(Serdes.Long())
        .withValueSerde(CATEGORY_JSON_SERDE);

return streamsBuilder.globalTable(categoryTopic, materialized);

我希望收到更改的通知.它很少更新,以防万一我想触发缓存失效.卡夫卡这样做的方法是什么?

I would like to be notified by changes of it. It is rarely updated and in case an update I would like to trigger cache invalidation. What is the Kafka way of doing this?

推荐答案

GlobalKTable不支持此功能.但是,您可以使用全局存储"并实现每次更新都会调用的自定义 Processor .

GlobalKTable does not support this. However, you can use a "global store" and implement your custom Processor that will be called for each update.

在内部, GlobalKTable 使用全局存储"并为您提供 Processor 实现.

Internally, a GlobalKTable uses a "global store" and provides the Processor implementation for you.

您可以通过 StreamsBuilder#addGlobalStore()添加全局存储.

You can add a global store via StreamsBuilder#addGlobalStore().

这篇关于如何通知有关GlobalKTable状态存储的更新?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-26 23:20