我对以下拓扑的行为有一些疑问:
String topic = config.topic();
KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
// Receive a stream of various events
topology.eventsStream()
// Only process events that are implementing MyEvent
.filter((k, v) -> v instanceof MyEvent)
// Cast to ease the code
.mapValues(v -> (MyEvent) v)
// rekey by data id
.selectKey((k, v) -> v.data.id)
.peek((k, v) -> L.info("Event:"+v.action))
// join the event with the according entry in the KTable and apply the state mutation
.leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
.peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
// write the updated state to the KTable.
.to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
当我同时收到其他事件时,就会发生我的问题。因为我的状态突变由
leftJoin
完成,然后由to
方法写入。如果使用相同的密钥同时收到事件1和2,则可能发生以下情况:event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic
因此,状态Y没有
event1
的更改,因此丢失了数据。这是我所看到的日志(
Processing:...
部分是从值连接器内部记录的):Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1
Event1
可以看作是创建事件:它将在KTable中创建条目,因此状态为空并不重要。 Event2
尽管需要将其更改应用到现有状态,但未找到任何内容,因为第一个状态突变仍未写入KTable(to
方法仍未对其进行处理)无论如何,要确保我的leftJoin和我对ktable的写入是原子完成的?
谢谢
更新和当前解决方案
感谢@Matthias的回应,我得以使用
Transformer
找到解决方案。代码如下所示:
那是变压器
public class KStreamStateLeftJoin<K, V1, V2> implements Transformer<K, V1, KeyValue<K, V2>> {
private final String stateName;
private final ValueJoiner<V1, V2, V2> joiner;
private final boolean updateState;
private KeyValueStore<K, V2> state;
public KStreamStateLeftJoin(String stateName, ValueJoiner<V1, V2, V2> joiner, boolean updateState) {
this.stateName = stateName;
this.joiner = joiner;
this.updateState = updateState;
}
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.state = (KeyValueStore<K, V2>) context.getStateStore(stateName);
}
@Override
public KeyValue<K, V2> transform(K key, V1 value) {
V2 stateValue = this.state.get(key); // Get current state
V2 updatedValue = joiner.apply(value, stateValue); // Apply join
if (updateState) {
this.state.put(key, updatedValue); // write new state
}
return new KeyValue<>(key, updatedValue);
}
@Override
public KeyValue<K, V2> punctuate(long timestamp) {
return null;
}
@Override
public void close() {}
}
这是适应的拓扑:
String topic = config.topic();
String store = topic + "-store";
KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic, store);
// Receive a stream of various events
topology.eventsStream()
// Only process events that are implementing MyEvent
.filter((k, v) -> v instanceof MyEvent)
// Cast to ease the code
.mapValues(v -> (MyEvent) v)
// rekey by data id
.selectKey((k, v) -> v.data.id)
// join the event with the according entry in the KTable and apply the state mutation
.transform(() -> new KStreamStateLeftJoin<UUID, MyEvent, MyData>(store, eventHandler::handleEvent, true), store)
// write the updated state to the KTable.
.to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
当我们使用KTable的KV StateStore并通过
put
方法事件直接在其中应用更改时,shoudl始终会选择更新后的状态。我仍然想知道的一件事:如果我连续不断地处理大量事件,该怎么办?
我们在KTable的KV存储上执行的看跌期权与在KTable的主题中完成的写入之间是否仍存在竞争条件?
最佳答案
KTable
分片到多个物理存储中,每个存储仅由单个线程更新。因此,您描述的情况不会发生。如果您有2个具有相同时间戳的记录,并且它们都更新相同的分片,则它们将一个接一个地处理(以偏移顺序)。因此,第二次更新将看到第一次更新后的状态。
因此,也许您只是描述了不正确的情况?
更新资料
进行连接时,您无法更改状态。因此,期望
event1 joins with state A => state A mutated to state X
是错的。
event1
与state A
联接时,与任何处理顺序无关,它将以只读模式访问state A
,并且state A
不会被修改。因此,当
event2
加入时,它将看到与event1
相同的状态。对于流表联接,仅当从table-input-topic中读取新数据时,才更新表状态。如果要使用两个输入都更新的共享状态,则需要使用
transform()
构建自定义解决方案:builder.addStore(..., "store-name");
builder.stream("table-topic").transform(..., "store-name"); // will not emit anything downstream
KStream result = builder.stream("stream-topic").transform(..., "store-name");
这将创建一个由两个处理器共享的存储,并且两个存储都可以根据需要进行读写。因此,对于表输入,您可以仅更新状态而无需向下游发送任何内容,而对于流输入,您可以进行联接,更新状态并向下游发送结果。
更新2
关于解决方案,在
Transformer
应用于状态的更新之间没有竞争条件,并且在状态更新之后记录Transformer
进程。这部分将在单个线程中执行,并且记录将以与输入主题偏移的顺序进行处理。因此,可以确保状态更新可用于以后的记录。