本文介绍了Kafka Connect SMT ApplyWithSchema 需要结构错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经从 confluent https://github.com/confluentinc/部署了一个样本kafka-connect-insert-uuid 用于添加简单的 UUID 字段,但我收到一个错误,它需要结构.我在 Debezium MySQLConnector 中应用它

I have deployed a sample from confluent https://github.com/confluentinc/kafka-connect-insert-uuid for adding simple UUID field but I am getting an error that it requires struct. I am applying this within Debezium MySQLConnector

  Only Struct objects supported for [adding UUID to record], found: 
  java.lang.String\n\tat org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)

什么是只按原样返回记录的简约 applyWithSchema 方法?我正在尝试调试并需要一个 HelloWorld SMT 没有任何错误,必须应用包括 applyWithSchema 在内的方法

What is a minimalist applyWithSchema method that just returns the record as is? I am trying to debug and need a HelloWorld SMT without any errors that have to apply methods including applyWithSchema

我认为这对于应用程序来说可能是最简单的,但需要 applyWithSchema

I think this is probably most simple for the application but need for applyWithSchema

Override
public R apply(R record) {
    
    return record.newRecord(
            record.topic(), record.kafkaPartition(),
            record.keySchema(), record.key(),
            record.valueSchema(), record.value(),
            record.timestamp()
    );
}

Override
public R applyWithSchema(R record) {
    // what is minimal transform here??
}

我现在只需要这些函数可以无错误地运行,因为我只对 record.headers().add() 进行了更改.

I just need these functions to run without error now, as I am making a change to record.headers().add() only.

这是给出错误的 applyWithSchema 方法:

Here is the applyWithSchema method that gives the error:

private R applyWithSchema(R record) {
    // FAILS HERE!
    final Struct value = requireStruct(operatingValue(record), PURPOSE);

    Schema updatedSchema = schemaUpdateCache.get(value.schema());
    if(updatedSchema == null) {
        updatedSchema = makeUpdatedSchema(value.schema()); 

        final Struct updatedValue = new Struct(updatedSchema);

        for (Field field : value.schema().fields()) {
         // updatedValue.put(field.name(), value.get(field));
        }

        //updatedValue.put(fieldName, getRandomUuid());

        return newRecord(record, updatedSchema, updatedValue);
    }

推荐答案

主要问题在于您的连接器转换配置顺序,当您定义它时,连接器将按同步顺序应用这些 SMT 操作.如果您需要自定义 SMT 的结构或本机信息,请确保之前的 SMT 不会改变自然状态.

Mostly the issue is with your connector transformation config order, connector will apply these SMT operations in synchronize order when you are defining it. If you are expecting a struct or a native information for your custom SMT make sure previous SMT won't be mutating the natural state.

"transforms":"SMT1, SMT2, CustomSMT3",

这篇关于Kafka Connect SMT ApplyWithSchema 需要结构错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-12 07:07