


Since I am working with Json values I haven't set up default serdes.


I process a KStream, consuming it with the necessary spring and product (json) serdes, but the next step (map operation) fails:

val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaBootstrapServers

val productSerde: Serde<Product> = Serdes.serdeFrom(JsonPojoSerializer<Product>(), JsonPojoDeserializer(Product::class.java))

builder.stream(INVENTORY_TOPIC, Consumed.with(Serdes.String(), productSerde))
            .map { key, value ->
                KeyValue(key, XXX)


If I remove the map operation the execution goes ok.


I haven't found a way to specify the serdes for the map(), how can it be done?


Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.codependent.kafkastreams.inventory.dto.Product). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)



  1. 调用 map()后,您调用 groupByKey().aggregate().这会触发数据重新分区,因此在将 map()数据写入内部主题以进行数据重新分区之后.因此,您还需要在 groupByKey()中提供相应的 Serde .

  1. After you call map() you call groupByKey().aggregate(). This triggers data repartition and thus after map() data is written into an internal topic for data repartitioning. Therefore, you need to provide corresponding Serdes within groupByKey(), too.

但是,因为您没有修改密钥,所以实际上应该调用 mapValues(),以避免不必要的重新分区.

However, because you don't modify the key, you should actually call mapValues() instead, to avoid the unnecessary repartitioning.

注意,您需要为每个不应使用配置中默认 Serde 的运算符提供 Serde . Serde 不会沿下游传递,而是操作员就地覆盖.(Kafka 2.1的改进工作正在进行中.)

Note, that you need to provide Serdes for each operator that should not use the default Serde from the config. Serdes are not passed along downstream, but are operator in-place overwrites. (It's work in progress for Kafka 2.1 to improve this.)


10-13 20:40