本文介绍了无法从其他应用程序作为 StateStore 访问 KTable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个 Java 应用程序(App1、App2)来测试如何在 docker 的单个实例环境中从不同的应用程序访问 KTable.

I have two Java Application (App1, App2) to test how to access a KTable from a different app on a single instance environment in docker.

第一个应用程序 (App1) 使用以下代码写入 KTable.

The first App (App1) writes to a KTable with following code.

public static void main(String[] args)
    {
        final Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"gateway-service");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ServiceTransactionSerde.class);


    KStreamBuilder builder = new KStreamBuilder();


    KStream<String,ServiceTransaction> source = builder.stream("gateway_request_processed");


    KStream<String, Long> countByApi = source.groupBy((key,value)-> value.getApiId().toString()).count("Counts").toStream();

    countByApi.to(Serdes.String(), Serdes.Long(),"countByApi");

    countByApi.print();

    final KafkaStreams streams = new KafkaStreams(builder,props);

    streams.start();
    System.out.println(streams.state());

    System.out.println(streams.allMetadata());
    System.out.println(streams.allMetadataForStore("countByApi"));

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {


        @Override
        public void run() {
            System.out.println(streams.allMetadata());
            streams.close();
        }
    }));
}

当我运行我的生产者时,我得到了 App1 中代码的以下输出

When I run my producer I got following output for the code in App1

    RUNNING
[]
[]
[KTABLE-TOSTREAM-0000000006]: c00af5ee-3c2d-4d12-9c4b-3b55c1284dd6, 19

这显示我状态 = RUNNING.商店的元数据也是空的.但是请求被成功处理并存储在KTable中(String,Long).

This shows me state = RUNNING. Metadata are empty also for the store. But the request gets processed and store in the KTable successfully (String,Long).

当我运行 kafka-topics.sh --list --zookeeper:2181我得到以下主题.

When I run kafka-topics.sh --list --zookeeper:2181I get the following topics.

bash-4.3# kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
countByApi
gateway-Counts-changelog
gateway-Counts-repartition
gateway-service-Counts-changelog
gateway-service-Counts-repartition
gateway_request_processed

这表明 KTable 以某种方式持久化了新主题.

This shows me that the KTable is somehow persisted with new topics.

然后我有一个带有以下代码的第二个命令行应用程序 (App2),它尝试将此 KTable 作为状态存储 (ReadOnlyKeyValueStore) 访问并访问它.>

I then have a secound command line app (App2) with following code which tries to access this KTable as a state store (ReadOnlyKeyValueStore) and access it.

 public static void main( String[] args )
    {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "gateway-service-table-client");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092");


        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams(builder,props);

        streams.cleanUp();
        streams.start();
        System.out.println( "Hello World!" );
        System.out.println(streams.state());

        ReadOnlyKeyValueStore<String,Long> keyValueStore =
        streams.store("countByApi", QueryableStoreTypes.keyValueStore());

        final KeyValueIterator<String,Long> range = keyValueStore.all();

        while(range.hasNext()){
            KeyValue<String,Long> next = range.next();
            System.out.println(String.format("key: %s | value: %s", next.key,next.value));

        }

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {


                        @Override
                        public void run() {
                            System.out.println(streams.allMetadata());
                            streams.close();
                        }
        }));

    }

当我运行 2. 应用程序时,我确实收到了错误消息:

When I run the 2. App I do get the error message:

    RUNNING
Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, countByApi, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728)
    at com.comp.streamtable.App.main(App.java:37)

不幸的是,我只有 1 个实例,并且我验证状态等于正在运行".

Unfortunatly I do have only 1 instance and I verify that the state is equal "RUNNING".

注意:我必须为每个应用程序选择不同的 application.id,因为这是另一个例外.只是想指出这一点,因为这可能是出于兴趣.

Note: I had to choose different application.id for each app since this thew another Exception. Just wanted to point this out since this might be for interest.

从另一个应用程序访问我的 KTable 时,我错过了什么?

What do I miss here to access my KTable from another app?

推荐答案

您为这两个应用程序使用了两个不同的 application.id.这样,两个应用就完全解耦了.

You are using two different application.id for both applications. Thus, both applications are completely decoupled.

交互式查询专为同一应用的不同实例而设计,不适用于跨应用.

Interactive Queries are designed for different instances of the same app, and do not work across applications.

这篇博文可能有帮助:https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/

这篇关于无法从其他应用程序作为 StateStore 访问 KTable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-12 15:55