问题描述
我正在使用第三方CDC工具,该工具将数据从源数据库复制到Kafka主题中.示例行如下所示:
I am using a 3rd party CDC tool that replicates data from a source database into Kafka topics. An example row is shown below:
{
"data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"A"
}
},
"beforeData":{
"Data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"B"
}
}
},
"headers":{
"operation":"UPDATE",
"timestamp":"2018-05-03T13:53:43.000"
}
}
在接收器文件中需要什么配置才能提取data
和headers
下的所有(子)字段,并忽略beforeData
下的那些(子)字段,以便由Kafka在其中传输数据的目标表接收器将包含以下字段:
What configuration is needed in the sink file in order to extract all the (sub)fields under data
and headers
and ignore those under beforeData
so that the target table in which the data will be transferred by Kafka Sink will contain the following fields:
USER_ID, USER_CATEGORY, operation, timestamp
我浏览了融合文档中转换列表,但是我无法找到如何使用它们来实现上述目标.
I went through the transformation list in confluent's docs but I was not able to find how to use them in order to achieve the aforementioned target.
推荐答案
如果您愿意列出特定的字段名称,则可以通过以下方式解决此问题:
If you're willing to list specific field names, you can solve this by:
- 使用Flatten变换折叠嵌套(它将原始结构的路径转换为以点分隔的名称)
- 使用带有
rename
的替换变换使字段名称成为您希望接收器发出的名称 - 使用另一个
whitelist
替换变换将发射的字段限制为您选择的字段
- Using a Flatten transform to collapse the nesting (which will convert the original structure's paths into dot-delimited names)
- Using a Replace transform with
rename
to make the field names be what you want the sink to emit - Using another Replace transform with
whitelist
to limit the emitted fields to those you select
对于您的情况,它可能看起来像:
For your case it might look like:
"transforms": "t1,t2,t3",
"transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
"transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",
这篇关于如何在Kafka Sink JDBC连接器中转换和提取字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!