Connect无法使用主题策略

Connect无法使用主题策略

本文介绍了Kafka Connect无法使用主题策略的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

上下文

我编码了几个小的 Kafka Connect 连接器.一个每秒仅生成随机数据,另一个每秒将其记录在控制台中.它们与架构注册表集成在一起,因此数据是用 Avro 序列化.

I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.

我使用 fast-data-dev Docker映像将它们部署到本地Kafka环境中Landoop

基本设置有效并每秒产生一条消息,记录下来

The basic setup works and produces a message each second that is logged

但是,我想更改主题名称策略.默认情况下会生成两个主题:

However, I want to change the subject name strategy. The default one generates two subjects:

  • $ {topic}-键
  • $ {topic} -value

根据我的用例,我将需要生成具有不同模式的事件,这些事件最终会出现在同一主题上.因此,我需要的主题名称是:

As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:

  • $ {topic}-$ {keyRecordName}
  • $ {topic}-$ {valueRecordName}

根据文档,我的需求符合 TopicRecordNameStrategy

我尝试了什么

我创建了 avroData 对象,用于发送值以进行连接:

I create the avroData object for sending values to connect:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }

,然后将其用于创建 SourceRecord 响应对象

and use it afterwards for creating the SourceRecord response objects

文档指出,为了使用Kafka Connect中的架构注册表我必须在连接器配置中设置一些属性.因此,当我创建它时,请添加它们:

The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

问题

连接器似乎忽略了这些属性,并继续使用旧的 $ {topic} -key $ {topic} -value 主题.

The connector seems to ignore those properties and keeps using the old ${topic}-key and ${topic}-value subjects.

问题

Kafka Connect应该支持不同的主题策略.我通过编写自己的 AvroConverter 并硬编码主题策略是我需要的策略.但是,这似乎不是一个好方法,并且在尝试使用Sink Kafka Connector占用数据时也带来了问题.我复制了该主题,因此有一个旧名称的版本( $ {topic} -key ),它可以正常工作

Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key) and it works

将主题策略指定给Kafka Connect的正确设置是什么?

What is the proper setup for specyfing the subject strategy to Kafka Connect?

推荐答案

您缺少用于配置的 key.converter value.converter 前缀.传递给对流器.因此,代替:

You're missing the key.converter and value.converter prefix, for the config to be passed through to the conveter. So instead of:

key.subject.name.strategy
value.subject.name.strategy

您想要的:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

来源 https://docs.confluent.io/current/connect/managing/configuring.html :

这篇关于Kafka Connect无法使用主题策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-28 03:03