本文介绍了未指定默认Serdes并使用自定义的Serdes时,在KStream上进行映射操作失败.org.apache.kafka.streams.errors.StreamsException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

自从我使用Json值以来,我还没有设置默认的Serdes.

Since I am working with Json values I haven't set up default serdes.

我处理KStream,并使用必需的spring和product(json)serdes来使用它,但是下一步(映射操作)失败:

I process a KStream, consuming it with the necessary spring and product (json) serdes, but the next step (map operation) fails:

val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaBootstrapServers

val productSerde: Serde<Product> = Serdes.serdeFrom(JsonPojoSerializer<Product>(), JsonPojoDeserializer(Product::class.java))

builder.stream(INVENTORY_TOPIC, Consumed.with(Serdes.String(), productSerde))
            .map { key, value ->
                KeyValue(key, XXX)
            }
            .aggregate(...)

如果我删除了地图操作,执行就可以了.

If I remove the map operation the execution goes ok.

我还没有找到一种方法来指定map()的serdes,怎么办?

I haven't found a way to specify the serdes for the map(), how can it be done?

错误:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.codependent.kafkastreams.inventory.dto.Product). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)

推荐答案

多个问题:

  1. 调用 map()后,您调用 groupByKey().aggregate().这会触发数据重新分区,因此在将 map()数据写入内部主题以进行数据重新分区之后.因此,您还需要在 groupByKey()中提供相应的 Serde .

  1. After you call map() you call groupByKey().aggregate(). This triggers data repartition and thus after map() data is written into an internal topic for data repartitioning. Therefore, you need to provide corresponding Serdes within groupByKey(), too.

但是,因为您没有修改密钥,所以实际上应该调用 mapValues(),以避免不必要的重新分区.

However, because you don't modify the key, you should actually call mapValues() instead, to avoid the unnecessary repartitioning.

注意,您需要为每个不应使用配置中默认 Serde 的运算符提供 Serde . Serde 不会沿下游传递,而是操作员就地覆盖.(Kafka 2.1的改进工作正在进行中.)

Note, that you need to provide Serdes for each operator that should not use the default Serde from the config. Serdes are not passed along downstream, but are operator in-place overwrites. (It's work in progress for Kafka 2.1 to improve this.)

这篇关于未指定默认Serdes并使用自定义的Serdes时,在KStream上进行映射操作失败.org.apache.kafka.streams.errors.StreamsException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-13 20:40