我正在使用Kafka Streams开发PoC。现在,我需要在流使用者中获取偏移量值,并使用它为每个消息生成唯一的 key (topic-offset)->hash
。原因是:生产者是syslog,只有少数具有ID。我无法在使用者中生成UUID,因为在进行重新处理的情况下,我需要重新生成相同的 key 。
我的问题是:org.apache.kafka.streams.processor.ProcessorContext
类公开了一个返回值的.offset()
方法,但是我使用的是KStream而不是Processor,因此我找不到返回相同结果的方法。
有人知道如何从KStream中提取每一行的消费者值(value)吗?
提前致谢。
最佳答案
您可以通过process(...)
,transform(...)
和transformValues(...)
使用混合匹配DSL和Processor API。
它允许您访问当前记录的偏移量,类似于普通的处理器API。在您的情况下,似乎您想使用KStream#transform(...)
。