问题描述
上下文
我编码了几个小的 Kafka Connect 连接器.一个每秒仅生成随机数据,另一个每秒将其记录在控制台中.它们与架构注册表集成在一起,因此数据是用 Avro 序列化.
I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.
我使用 fast-data-dev Docker映像将它们部署到本地Kafka环境中Landoop
基本设置有效并每秒产生一条消息,记录下来
The basic setup works and produces a message each second that is logged
但是,我想更改主题名称策略.默认情况下会生成两个主题:
However, I want to change the subject name strategy. The default one generates two subjects:
-
$ {topic}-键
-
$ {topic} -value
根据我的用例,我将需要生成具有不同模式的事件,这些事件最终会出现在同一主题上.因此,我需要的主题名称是:
As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:
-
$ {topic}-$ {keyRecordName}
-
$ {topic}-$ {valueRecordName}
根据文档,我的需求符合 TopicRecordNameStrategy
我尝试了什么
我创建了 avroData
对象,用于发送值以进行连接:
I create the avroData
object for sending values to connect:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
,然后将其用于创建 SourceRecord
响应对象
and use it afterwards for creating the SourceRecord
response objects
文档指出,为了使用Kafka Connect中的架构注册表我必须在连接器配置中设置一些属性.因此,当我创建它时,请添加它们:
The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
问题
连接器似乎忽略了这些属性,并继续使用旧的 $ {topic} -key
和 $ {topic} -value
主题.
The connector seems to ignore those properties and keeps using the old ${topic}-key
and ${topic}-value
subjects.
问题
Kafka Connect应该支持不同的主题策略.我通过编写自己的 AvroConverter
并硬编码主题策略是我需要的策略.但是,这似乎不是一个好方法,并且在尝试使用Sink Kafka Connector占用数据时也带来了问题.我复制了该主题,因此有一个旧名称的版本( $ {topic} -key
),它可以正常工作
Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter
and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key
) and it works
将主题策略指定给Kafka Connect的正确设置是什么?
What is the proper setup for specyfing the subject strategy to Kafka Connect?
推荐答案
您缺少用于配置的 key.converter
和 value.converter
前缀.传递给对流器.因此,代替:
You're missing the key.converter
and value.converter
prefix, for the config to be passed through to the conveter. So instead of:
key.subject.name.strategy
value.subject.name.strategy
您想要的:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
来源 https://docs.confluent.io/current/connect/managing/configuring.html :
这篇关于Kafka Connect无法使用主题策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!