本文介绍了Tombstone 消息没有从 KTable 状态存储中删除记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在创建 KTable 处理来自 KStream 的数据.但是当我触发带有键和空负载的墓碑消息时,它不会从 KTable 中删除消息.

I am creating KTable processing data from KStream. But when I trigger a tombstone messages with key and null payload, it is not removing message from KTable.

样品 -

public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
                .map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
                .groupByKey()
                reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));


GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);

在触发具有空值的消息时,我可以在具有空负载的 testStream 映射函数中进行调试,但它不会删除 KTable 更改日志test-store"上的记录.看起来它甚至没有达到 reduce 方法,不确定我在这里缺少什么.

Upon triggering a message with null value, I can debug in testStream map function with null payload, but it doesn't remove record on KTable change log "test-store". Looks like it doesn't even reach reduce method, not sure what I am missing here.

感谢您对此的任何帮助!

Appreciate any help on this!

谢谢.

推荐答案

reduce()

忽略带有 {@code null} 键或值的记录.

因为, 记录被删除,因此 (genericRecord, v1) ->v1 永远不会执行,也不会将逻辑删除写入存储或更改日志主题.

Because, the <key,null> record is dropped and thus (genericRecord, v1) -> v1 is never executed, no tombstone is written to the store or changelog topic.

对于您想到的用例,您需要使用指示删除"的代理值,例如您的 Avro 记录中的布尔标志.您的reduce 函数需要检查标志并在标志被设置时返回null;否则,它必须定期处理记录.

For the use case you have in mind, you need to use a surrogate value that indicates "delete", for example a boolean flag within your Avro record. Your reduce function needs to check for the flag and return null if the flag is set; otherwise, it must process the record regularly.

更新:

Apache Kafka 2.6 添加了 KStream#toTable() 操作符(通过 KIP-523) 允许将 KStream 转换为 KTable.

Apache Kafka 2.6 adds the KStream#toTable() operator (via KIP-523) that allows to transform a KStream into a KTable.

这篇关于Tombstone 消息没有从 KTable 状态存储中删除记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-15 10:48