问题描述
我正在尝试访问我在同一个 java 程序中创建的 inMemoryStore.但是返回一个异常作为线程main"中的异常 org.apache.kafka.streams.errors.InvalidStateStoreException:状态存储 storeName 可能已迁移到另一个实例."
当我使用persistentKeyValueStore 时它工作正常并且能够创建存储并返回值.
包com.bakdata.streams_store.demo;导入 java.util.Collection;导入 java.util.Properties;导入 org.apache.kafka.common.serialization.Serdes;导入 org.apache.kafka.streams.KafkaStreams;导入 org.apache.kafka.streams.StreamsBuilder;导入 org.apache.kafka.streams.StreamsConfig;导入 org.apache.kafka.streams.kstream.KStream;导入 org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;导入 org.apache.kafka.streams.state.KeyValueStore;导入 org.apache.kafka.streams.state.QueryableStoreTypes;导入 org.apache.kafka.streams.state.ReadOnlyKeyValueStore;导入 org.apache.kafka.streams.state.StoreBuilder;导入 org.apache.kafka.stre7ams.state.Stores;导入 org.apache.kafka.streams.state.StreamsMetadata;公共类 InMemoryStore {public static void main(String[] args) 抛出异常 {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-id-0001");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());String storeName = "样品";KeyValueBytesStoreSupplier stateStore = Stores.inMemoryKeyValueStore(storeName);StoreBuilder>storeBuilder = Stores.keyValueStoreBuilder(stateStore, Serdes.String(), Serdes.String());StreamsBuilder builder = new StreamsBuilder();builder.addStateStore(storeBuilder);KStreaminputStream = builder.stream("material_test1");KafkaStreams 流 = new KafkaStreams(builder.build(), props);尝试 {流开始();线程睡眠(30000);} catch(最终的Throwable e){System.exit(1);}final ReadOnlyKeyValueStorekeyValueStore = stream.store(storeName, QueryableStoreTypes.keyValueStore());KeyValueIteratorrange = keyValueStore.all();而 (range.hasNext()) {KeyValue下一个 = range.next();System.out.println("key:" + next.key + ", value:" + next.value);}}}
线程main"中的异常org.apache.kafka.streams.errors.InvalidStateStoreException:状态store、sample 可能已迁移到另一个实例.在org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:62)在org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1067)在com.bakdata.streams_store.demo.InMemoryStore.main(InMemoryStore.java:59)
我希望打印 ReadOnlyStoreQuery 中的值.
您不能在流上使用 StateStore,因为单个键可能有多个值.您需要先将其转换为 KTable (streams.table(...)
) 或 GlobalKtable (streams.globalTable(...)
).
Kotlin 示例:
val businessObjects = streamsBuilder.globalTable("topic", eventStore("store-name"))
其中 eventStore
是:
fun eventStore(name: String) = Materialized.`as`(Stores.inMemoryKeyValueStore(name)).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
在您开始直播后:
var store: ReadOnlyKeyValueStore=streams.store("store-name", keyValueStore())
注意:还有一个接口 KafkaStreams.StateListener
用于当流准备好时
覆盖 fun onChange(newState: KafkaStreams.State?, oldState: KafkaStreams.State?) =Option.fromNullable(newState).filter { 重新平衡 == oldState &&正在运行 == }.map { store = streams.store("store-name", keyValueStore()) }.getOrElse { log.info("正在等待 Kafka 处于 REBALANCING -> RUNNING,但它是 $oldState -> $newState") }
或者,您也可以使用
将流转换为KTable
stream.groupByKey().reduce(...)
如此处所述.
I am trying to access the inMemoryStore that I am creating with in the same java program. But returning a exception as "Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, storeName, may have migrated to another instance."
When I am using the persistentKeyValueStore it is working fine and able to create the store and return the values.
package com.bakdata.streams_store.demo;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.stre7ams.state.Stores;
import org.apache.kafka.streams.state.StreamsMetadata;
public class InMemoryStore {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-id-0001");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
String storeName = "sample";
KeyValueBytesStoreSupplier stateStore = Stores.inMemoryKeyValueStore(storeName);
StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(stateStore, Serdes.String(), Serdes.String());
StreamsBuilder builder = new StreamsBuilder();
builder.addStateStore(storeBuilder);
KStream<String, String> inputStream = builder.stream("material_test1");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
try {
streams.start();
Thread.sleep(30000);
} catch (final Throwable e) {
System.exit(1);
}
final ReadOnlyKeyValueStore<String, String> keyValueStore = streams.store(storeName, QueryableStoreTypes.keyValueStore());
KeyValueIterator<String, String> range = keyValueStore.all();
while (range.hasNext()) {
KeyValue<String, String> next = range.next();
System.out.println("Key: " + next.key + ", value: " + next.value);
}
}
}
I am expecting to print the values from the ReadOnlyStoreQuery.
You cannot have a StateStore on a stream as there could be multiple values for a single key. You need to turn it into a KTable (streams.table(...)
) or GlobalKtable (streams.globalTable(...)
) first.
Kotlin example:
val businessObjects = streamsBuilder.globalTable("topic", eventStore("store-name"))
where eventStore
is:
fun eventStore(name: String) = Materialized.`as`<String, String>(Stores.inMemoryKeyValueStore(name))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
after you started the streams:
var store: ReadOnlyKeyValueStore<String, String> = streams.store("store-name", keyValueStore<String, String>())
Note: There is also an interface KafkaStreams.StateListener
for when the streams are ready
override fun onChange(newState: KafkaStreams.State?, oldState: KafkaStreams.State?) =
Option.fromNullable(newState)
.filter { REBALANCING == oldState && RUNNING == it }
.map { store = streams.store("store-name", keyValueStore<String, String>()) }
.getOrElse { log.info("Waiting for Kafka being in REBALANCING -> RUNNING, but it is $oldState -> $newState") }
Alternatively you also could turn your stream into a KTable
with
stream.groupByKey().reduce(...)
like described here.
这篇关于线程“main"中的异常org.apache.kafka.streams.errors.InvalidStateStoreException:的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!