问题描述
我需要监听来自 Kafka Topic 和 Sink 的事件到 MongoDB 中的集合.消息包含一个带有 id 属性的嵌套对象,就像上面的例子一样.
I need to listening events from a Kafka Topic and Sink to a collection in MongoDB. The message contains an nested object with an id property, like in the example above.
{
"testId": 1,
"foo": "bar",
"foos": [{ "id":"aaaaqqqq-rrrrr" }]
}
我正在尝试使用 RegExp 将此嵌套 id 重命名为 _id
I'm trying to rename this nested id to _id with RegExp
{
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "test",
"connection.uri": "mongodb://mongo:27017",
"database": "test_db",
"collection": "test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"value.projection.list":"testId",
"value.projection.type": "whitelist",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder, com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex",
"field.renamer.regexp": "[{\"regexp\":\"\b(id)\b\", \"pattern\":\"\b(id)\b\",\"replace\":\"_id\"}]"
}
配置/验证的结果是500 Internal Server Error
,带有该消息:
And the result of a config/validate is 500 Internal Server Error
, with that message:
{
"error_code": 500,
"message": null
}
我遗漏了什么还是有问题?
I missing something or is a issue?
推荐答案
我认为你想要的只是 Kafka Connect 单消息转换 (SMT),更准确地说是 ReplaceField
:
I think all you want is Kafka Connect Single Message Transform (SMT) and more precisely ReplaceField
:
过滤或重命名结构或映射中的字段.
以下将用 _id
替换 id
字段名称:
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "id:_id"
在您的情况下,在应用上述转换之前,您可能还想Flatten
foos
:
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "."
最后应用转换以重命名字段:
and finally apply the transformation for renaming the field:
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "foos.id:foos._id"
这篇关于MongoDB Kafka Sink 连接器不处理 RenameByRegex 处理器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!