问题描述
自从我使用Json值以来,我还没有设置默认的Serdes.
Since I am working with Json values I haven't set up default serdes.
我处理KStream,并使用必需的spring和product(json)serdes来使用它,但是下一步(映射操作)失败:
I process a KStream, consuming it with the necessary spring and product (json) serdes, but the next step (map operation) fails:
val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaBootstrapServers
val productSerde: Serde<Product> = Serdes.serdeFrom(JsonPojoSerializer<Product>(), JsonPojoDeserializer(Product::class.java))
builder.stream(INVENTORY_TOPIC, Consumed.with(Serdes.String(), productSerde))
.map { key, value ->
KeyValue(key, XXX)
}
.aggregate(...)
如果我删除了地图操作,执行就可以了.
If I remove the map operation the execution goes ok.
我还没有找到一种方法来指定map()的serdes,怎么办?
I haven't found a way to specify the serdes for the map(), how can it be done?
错误:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.codependent.kafkastreams.inventory.dto.Product). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
推荐答案
多个问题:
-
调用
map()
后,您调用groupByKey().aggregate()
.这会触发数据重新分区,因此在将map()
数据写入内部主题以进行数据重新分区之后.因此,您还需要在groupByKey()
中提供相应的Serde
.
After you call
map()
you callgroupByKey().aggregate()
. This triggers data repartition and thus aftermap()
data is written into an internal topic for data repartitioning. Therefore, you need to provide correspondingSerde
s withingroupByKey()
, too.
但是,因为您没有修改密钥,所以实际上应该调用 mapValues()
,以避免不必要的重新分区.
However, because you don't modify the key, you should actually call mapValues()
instead, to avoid the unnecessary repartitioning.
注意,您需要为每个不应使用配置中默认 Serde
的运算符提供 Serde
. Serde
不会沿下游传递,而是操作员就地覆盖.(Kafka 2.1的改进工作正在进行中.)
Note, that you need to provide Serde
s for each operator that should not use the default Serde
from the config. Serde
s are not passed along downstream, but are operator in-place overwrites. (It's work in progress for Kafka 2.1 to improve this.)
这篇关于未指定默认Serdes并使用自定义的Serdes时,在KStream上进行映射操作失败.org.apache.kafka.streams.errors.StreamsException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!