问题描述
我已经从融合的>://://github.com/confluentinc/kafka-connect-insert-uuid 用于添加简单的UUID字段,但出现错误,它要求结构.我正在Debezium MySQLConnector中应用此功能
I have deployed a sample from confluent https://github.com/confluentinc/kafka-connect-insert-uuid for adding simple UUID field but I am getting an error that it requires struct. I am applying this within Debezium MySQLConnector
Only Struct objects supported for [adding UUID to record], found:
java.lang.String\n\tat org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
什么是极简主义的applyWithSchema方法,它仅按原样返回记录?我正在尝试调试,并且需要一个HelloWorld SMT,而没有任何错误,这些错误都必须应用包括applyWithSchema
What is a minimalist applyWithSchema method that just returns the record as is? I am trying to debug and need a HelloWorld SMT without any errors that have to apply methods including applyWithSchema
我认为这可能是最简单的应用程序,但需要applyWithSchema
I think this is probably most simple for the application but need for applyWithSchema
Override
public R apply(R record) {
return record.newRecord(
record.topic(), record.kafkaPartition(),
record.keySchema(), record.key(),
record.valueSchema(), record.value(),
record.timestamp()
);
}
Override
public R applyWithSchema(R record) {
// what is minimal transform here??
}
我只需要这些函数现在就可以正常运行,因为我只更改了record.headers().add().
I just need these functions to run without error now, as I am making a change to record.headers().add() only.
这是applyWithSchema方法,会产生错误:
Here is the applyWithSchema method that gives the error:
private R applyWithSchema(R record) {
// FAILS HERE!
final Struct value = requireStruct(operatingValue(record), PURPOSE);
Schema updatedSchema = schemaUpdateCache.get(value.schema());
if(updatedSchema == null) {
updatedSchema = makeUpdatedSchema(value.schema());
final Struct updatedValue = new Struct(updatedSchema);
for (Field field : value.schema().fields()) {
// updatedValue.put(field.name(), value.get(field));
}
//updatedValue.put(fieldName, getRandomUuid());
return newRecord(record, updatedSchema, updatedValue);
}
推荐答案
问题主要出在您的连接器转换配置顺序上,当您定义连接器时,连接器将以同步顺序应用这些SMT操作.如果您希望自定义SMT具有结构或本机信息,请确保以前的SMT不会改变自然状态.
Mostly the issue is with your connector transformation config order, connector will apply these SMT operations in synchronize order when you are defining it. If you are expecting a struct or a native information for your custom SMT make sure previous SMT won't be mutating the natural state.
"transforms":"SMT1, SMT2, CustomSMT3",
这篇关于Kafka Connect SMT ApplyWithSchema需要结构错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!