问题描述
添加(& )在使用Kafka Streams处理主题时是否可以获得这些标题?在 KStream
上调用 map
等方法时,它提供了键的参数
和记录的值
但我无法看到访问标题
。如果我们只能 map
而不是 ConsumerRecord
,那就太好了。
With the addition of Headers to the records (ProducerRecord & ConsumerRecord) in Kafka 0.11, is it possible to get these headers when processing a topic with Kafka Streams? When calling methods like map
on a KStream
it provides arguments of the key
and the value
of the record but no way I can see to access the headers
. It would be nice if we could just map
over the ConsumerRecord
s.
ex。
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
这样的事情会起作用:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
推荐答案
自Streams API的2.0版以来,可以访问记录标题。 (参见了解详情。)
Records headers are accessible since versions 2.0 of Streams API. (Cf. KIP-244 for details.)
您可以通过处理器API访问记录元数据(即通过转换()
, transformValues()
,或 process()
),由给定的上下文对象(参见)。
You can access record metadata via Processor API (ie, via transform()
, transformValues()
, or process()
), by the given "context" object (cf. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).
之前2.0,上下文只公开主题,分区,偏移量和时间戳---但不是那些在旧版本中读取时被Streams删除的标题。
Prior to 2.0, the context only exposes topic, partition, offset, and timestamp---but not headers that are in fact dropped by Streams on read in those older versions.
元数据虽然在DSL级别不可用。但是,还在进行扩展DSL的工作:
Metadata is not available at DSL level though. However, there is also work in progress to extend the DSL: https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams
这篇关于是否可以使用Kafka Streams访问邮件头?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!