问题描述
假设我们有一个变压器(用Scala编写)
Suppose we have a transformer (written in Scala)
new Transformer[String, V, (String, V)]() {
var context: ProcessorContext = _
override def init(context: ProcessorContext): Unit = {
this.context = context
}
override def transform(key: String, value: V): (String, V) = {
val timestamp = toTimestamp(value)
context.forward(key, value, To.all().withTimestamp(timestamp))
key -> value
}
override def close(): Unit = ()
}
其中,toTimestamp
只是一个函数,该函数返回从记录值获取的时间戳.一旦执行,就会有一个NPE:
where toTimestamp
is just a function which returns an a timestamp fetched from the record value. Once it gets executed, there's an NPE:
Exception in thread "...-6f3693b9-4e8d-4e65-9af6-928884320351-StreamThread-5" java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:110)
at CustomTransformer.transform()
at CustomTransformer.transform()
at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:302)
at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:300)
at
实际上发生的是ProcessorContextImpl
失败:
public <K, V> void forward(final K key, final V value, final To to) {
toInternal.update(to);
if (toInternal.hasTimestamp()) {
recordContext.setTimestamp(toInternal.timestamp());
}
final ProcessorNode previousNode = currentNode();
因为recordContext
尚未初始化(只能由KafkaStreams在内部完成).
because the recordContext
was not initialized (an it could only be done internally by KafkaStreams).
这是一个后续问题在Kafka Streams 1中设置输出的时间戳
推荐答案
如果使用transformer
,则需要确保在调用TransformerSupplier#get()
时创建新的Transformer
对象. (请参见 https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata )
If you work with transformer
, you need to make sure that a new Transformer
object is create when TransformerSupplier#get()
is called. (cf. https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata)
在最初的问题中,我认为这与导致NPE的context
变量有关,但现在我意识到这与Kafka Streams内部有关.
In the original question, I thought it's about your context
variable that results in NPE, but now I realized it's about the Kafka Streams internals.
Scala API在2.0.0中有一个bug,可能会导致相同的Transformer
实例被重用( https://issues.apache.org/jira/browse/KAFKA-7250 ).我认为您正在遇到此错误.稍微重写一下代码就可以解决问题.请注意,Kafka 2.0.1和Kafka 2.1.0包含修复程序.
The Scala API has a bug in 2.0.0 that may result in the case that the same Transformer
instance is reused (https://issues.apache.org/jira/browse/KAFKA-7250). I think that you are hitting this bug. Rewriting your code a little bit should fix the issues. Note, that Kafka 2.0.1 and Kafka 2.1.0 contain a fix.
这篇关于使用Kafka Streams在输出中设置时间戳无法进行转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!