问题描述
对于我的 Kafka 流应用程序之一,我需要同时使用 DSL 和处理器 API 的功能.我的流媒体应用流程是
For one of my Kafka streams apps, I need to use the features of both DSL and Processor API. My streaming app flow is
source -> selectKey -> filter -> aggregate (on a window) -> sink
聚合后,我需要向接收器发送一条聚合消息.所以我定义我的拓扑如下
After aggregation I need to send a SINGLE aggregated message to the sink. So I define my topology as below
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor());
我定义了一个自定义的 StateStore
并将其注册到我的处理器中,如下所示
I define a custom StateStore
and register it with my processor as below
public class MyProcessor implements Processor<String, String> {
private ProcessorContext context = null;
Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);
KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
.withKeys(Serdes.String())
.withValues(invSerde)
.persistent()
.build()
.get();
public MyProcessor() {
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.register(invStore, false, null); // register the store
this.context.schedule(10 * 60 * 1000L);
}
@Override
public void process(String partitionKey, String message) {
try {
MessageModel smb = new MessageModel(message);
HashMapStore oldStore = invStore.get(partitionKey);
if (oldStore == null) {
oldStore = new HashMapStore();
}
oldStore.addSmb(smb);
invStore.put(partitionKey, oldStore);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void punctuate(long timestamp) {
// processes all the messages in the state store and sends single aggregate message
}
@Override
public void close() {
invStore.close();
}
}
当我运行应用程序时,我得到 java.lang.NullPointerException
When I run the app, I get java.lang.NullPointerException
线程StreamThread-18"中的异常 java.lang.NullPointerException在 org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167)在 org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:332)在 org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252)在 org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446)在 org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434)在 org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422)在 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340)在 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
知道这里出了什么问题吗?
Any idea what's going wrong here?
推荐答案
您需要使用 StreamsBuilder
(或 KStreamBuilder
在旧版本中).首先创建商店,然后将其注册到 StreamsBuilder
(KStreamBuilder
),然后在添加处理器时提供商店名称以连接处理器和商店.
You need to register you store outside of you processor using StreamsBuilder
(or KStreamBuilder
in older releases). First you create the store, than you registers it to StreamsBuilder
(KStreamBuilder
), and when you add the processor you provide the store name to connect the processor and the store.
StreamsBuilder builder = new StreamsBuilder();
// create store
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("invStore"),
Serdes.String(),
invSerde));
// register store
builder.addStateStore(storeBuilder);
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name
// older API:
KStreamBuilder builder = new KStreamBuilder();
// create store
StateStoreSupplier storeSupplier = (KeyValueStore)Stores.create("invStore")
.withKeys(Serdes.String())
.withValues(invSerde)
.persistent()
.build();
// register store
builder.addStateStore(storeSupplier);
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name
这篇关于如何将自定义 StateStore 添加到 Kafka Streams DSL 处理器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!