本文介绍了Kafka Connect SMT ApplyWithSchema需要结构错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经从融合的>://://github.com/confluentinc/kafka-connect-insert-uuid 用于添加简单的UUID字段,但出现错误,它要求结构.我正在Debezium MySQLConnector中应用此功能

I have deployed a sample from confluent https://github.com/confluentinc/kafka-connect-insert-uuid for adding simple UUID field but I am getting an error that it requires struct. I am applying this within Debezium MySQLConnector

  Only Struct objects supported for [adding UUID to record], found: 
  java.lang.String\n\tat org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)

什么是极简主义的applyWithSchema方法,它仅按原样返回记录?我正在尝试调试,并且需要一个HelloWorld SMT,而没有任何错误,这些错误都必须应用包括applyWithSchema

What is a minimalist applyWithSchema method that just returns the record as is? I am trying to debug and need a HelloWorld SMT without any errors that have to apply methods including applyWithSchema

我认为这可能是最简单的应用程序,但需要applyWithSchema

I think this is probably most simple for the application but need for applyWithSchema

Override
public R apply(R record) {
    
    return record.newRecord(
            record.topic(), record.kafkaPartition(),
            record.keySchema(), record.key(),
            record.valueSchema(), record.value(),
            record.timestamp()
    );
}

Override
public R applyWithSchema(R record) {
    // what is minimal transform here??
}

我只需要这些函数现在就可以正常运行,因为我只更改了record.headers().add().

I just need these functions to run without error now, as I am making a change to record.headers().add() only.

这是applyWithSchema方法,会产生错误:

Here is the applyWithSchema method that gives the error:

private R applyWithSchema(R record) {
    // FAILS HERE!
    final Struct value = requireStruct(operatingValue(record), PURPOSE);

    Schema updatedSchema = schemaUpdateCache.get(value.schema());
    if(updatedSchema == null) {
        updatedSchema = makeUpdatedSchema(value.schema()); 

        final Struct updatedValue = new Struct(updatedSchema);

        for (Field field : value.schema().fields()) {
         // updatedValue.put(field.name(), value.get(field));
        }

        //updatedValue.put(fieldName, getRandomUuid());

        return newRecord(record, updatedSchema, updatedValue);
    }

推荐答案

问题主要出在您的连接器转换配置顺序上,当您定义连接器时,连接器将以同步顺序应用这些SMT操作.如果您希望自定义SMT具有结构或本机信息,请确保以前的SMT不会改变自然状态.

Mostly the issue is with your connector transformation config order, connector will apply these SMT operations in synchronize order when you are defining it. If you are expecting a struct or a native information for your custom SMT make sure previous SMT won't be mutating the natural state.

"transforms":"SMT1, SMT2, CustomSMT3",

这篇关于Kafka Connect SMT ApplyWithSchema需要结构错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-12 07:07