通过在Kafka 0.11中的记录(HeadersProducerRecord)中添加ConsumerRecord,在使用Kafka Streams处理主题时是否可以获得这些 header ?在map上调用诸如KStream之类的方法时,它提供了key和记录的value的参数,但是我看不到访问headers的方法。如果我们可以将map替换为ConsumerRecord,那就太好了。

例如

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ...

这样的事情会工作:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> {
        record.headers();
        record.key();
        record.value();
    })
    ...

最佳答案

自Streams API 2.0版以来,可以访问记录头。 (有关详细信息,请参见KIP-244。)

您可以通过给定的“上下文”对象(请参阅https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context)通过Processor API(即,通过transform()transformValues()process())访问记录元数据。

在2.0之前的版本中,上下文仅公开主题,分区,偏移量和时间戳-而不公开那些旧版本中Streams在读取时实际上丢弃的 header 。

但是,元数据在DSL级别上不可用。但是,也正在进行扩展DSL的工作:https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams

10-06 09:52