问题描述
我正在尝试为 7 天前的每一行发送记录.这是我正在处理的配置,但它即使查询在数据库服务器上生成记录也不起作用.
I'm trying to send record for every row 7 days old. This is the configuration I was working on but itdoesn't work even though the query produces records on the DB server.
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": 1,
"mode": "bulk",
"connection.url": "jdbc:mysql://mysql:3300/test_db?user=root&password=password",
"query": "SELECT * FROM test_table WHERE DATEDIFF(CURDATE(), test_table.modified) = 7;",
"topic.prefix": "test-jdbc-",
"poll.interval.ms": 10000
}
推荐答案
JDBC 源连接器使用 JDBC 驱动程序将关系数据库中的数据导入到 Apache Kafka 主题中.数据正在定期加载,要么基于时间戳递增,要么批量加载.最初,尽管在创建 JDBC 连接器时使用模式增量或批量,它会在仅加载时间戳列上的新行或修改行之后将所有数据加载到主题中.
The JDBC source connector imports data from the relational database into the Apache Kafka topic by using the JDBC driver.Data is loading periodically either increment based on the timestamp or bulk load. Initially despite mode increment or bulk when you create a JDBC connector it loads all data into the topic after it will load only new or modified rows on the timestamp column.
批量:此模式未经过滤,因此根本不是增量.它将在每次迭代时加载表中的所有行.如果您想定期转储最终删除条目并且下游系统可以安全地处理重复项的整个表,这将非常有用.所以意味着你不能使用批量模式增量加载过去 7 天
Bulk: This mode is unfiltered and therefore not incremental at all. It will load all rows from a table on each iteration. This can be useful if you want to periodically dump an entire table where entries are eventually deleted and the downstream system can safely handle duplicates.So means you can't load the last 7 days incrementally using bulk mode
时间戳列:在此模式下,包含修改时间戳的单个列用于跟踪上次处理数据的时间,并仅查询自那时以来已修改的行.在这里您可以加载增量数据.但是当您第一次创建时它是如何工作的,它将加载数据库表中的所有可用数据,因为对于 JDBC 连接器,这些是新数据.稍后它只会加载新的或修改过的数据.
Timestamp Column: In this mode, a single column containing a modification timestamp is used to track the last time data was processed and to query only for rows that have been modified since that time. Here you can able to load incremental data. But how it works when you create the first time it will load all data available in the database table because for JDBC connector these are new data. Later it will only load new or modified data.
现在,根据您的要求,您似乎正在尝试以某个时间间隔加载所有数据,该时间间隔将配置poll.interval.ms":10000.我看到您的 JDBC 连接设置符合定义,而查询可能不工作尝试使用如下查询.似乎 JDBC 连接器将查询包装为一个表,如果添加 case,则该表不起作用.
Now as per your requirement seems you are trying to load all data at some time interval which is going to configured "poll.interval.ms": 10000. I see your JDBC connect setting is as per definition whereas the query may not work try using the query as below. Seems JDBC connector wrap query as a table which is not working if you add where the case.
"query": "select * from (select * from test_table where modified > now() - interval '7' day) o",
试试下面的设置
{
"name": "application_name",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connection.url": "jdbc:mysql://mysql:3300/test_db",
"connection.user": "root",
"connection.password": "password",
"connection.attempts": "1",
"mode": "bulk",
"validate.non.null": false,
"query": "select * from (select * from test_table where modified > now() - interval '7' day) o",
"table.types": "TABLE",
"topic.prefix": "test-jdbc-",
"poll.interval.ms": 10000
"schema.ignore": true,
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
这篇关于Kafka 连接可以在批量模式下使用自定义查询吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!