本文介绍了未指定默认 serdes 并使用自定义 serdes 时,对 KStream 的映射操作失败 ->org.apache.kafka.streams.errors.StreamsException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因为我使用的是 Json 值,所以我没有设置默认的 serdes.

Since I am working with Json values I haven't set up default serdes.

我处理了一个 KStream,使用必要的 spring 和产品 (json) serdes 来使用它,但是下一步(映射操作)失败了:

I process a KStream, consuming it with the necessary spring and product (json) serdes, but the next step (map operation) fails:

val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaBootstrapServers

val productSerde: Serde<Product> = Serdes.serdeFrom(JsonPojoSerializer<Product>(), JsonPojoDeserializer(Product::class.java))

builder.stream(INVENTORY_TOPIC, Consumed.with(Serdes.String(), productSerde))
            .map { key, value ->
                KeyValue(key, XXX)
            }
            .aggregate(...)

如果我删除地图操作,则执行正常.

If I remove the map operation the execution goes ok.

我还没有找到为 map() 指定 serdes 的方法,怎么做?

I haven't found a way to specify the serdes for the map(), how can it be done?

错误:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.codependent.kafkastreams.inventory.dto.Product). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)

推荐答案

多个问题:

  1. 在您调用 map() 之后,您将调用 groupByKey().aggregate().这会触发数据重新分区,因此在 map() 之后将数据写入内部主题以进行数据重新分区.因此,您还需要在groupByKey() 中提供相应的Serde.

  1. After you call map() you call groupByKey().aggregate(). This triggers data repartition and thus after map() data is written into an internal topic for data repartitioning. Therefore, you need to provide corresponding Serdes within groupByKey(), too.

但是,由于您没有修改密钥,您实际上应该调用 mapValues() 来代替,以避免不必要的重新分区.

However, because you don't modify the key, you should actually call mapValues() instead, to avoid the unnecessary repartitioning.

注意,您需要为每个操作员提供 Serde ,这些操作员不应使用配置中的默认 Serde.Serde 不会沿下游传递,而是操作符就地覆盖.(Kafka 2.1 正在改进这一点.)

Note, that you need to provide Serdes for each operator that should not use the default Serde from the config. Serdes are not passed along downstream, but are operator in-place overwrites. (It's work in progress for Kafka 2.1 to improve this.)

这篇关于未指定默认 serdes 并使用自定义 serdes 时,对 KStream 的映射操作失败 ->org.apache.kafka.streams.errors.StreamsException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-13 20:40