通过在Kafka 0.11中的记录(Headers和ProducerRecord)中添加ConsumerRecord,在使用Kafka Streams处理主题时是否可以获得这些 header ?在map
上调用诸如KStream
之类的方法时,它提供了key
和记录的value
的参数,但是我看不到访问headers
的方法。如果我们可以将map
替换为ConsumerRecord
,那就太好了。
例如
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
这样的事情会工作:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
最佳答案
自Streams API 2.0版以来,可以访问记录头。 (有关详细信息,请参见KIP-244。)
您可以通过给定的“上下文”对象(请参阅https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context)通过Processor API(即,通过transform()
,transformValues()
或process()
)访问记录元数据。
在2.0之前的版本中,上下文仅公开主题,分区,偏移量和时间戳-而不公开那些旧版本中Streams在读取时实际上丢弃的 header 。
但是,元数据在DSL级别上不可用。但是,也正在进行扩展DSL的工作:https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams