当Debezium在kafka connect中作为源运行时,并且如果该目标MySQL DB(Amazon RDS实例)上的一段时间未发生任何更新,那么一段时间后,我最终会遇到错误。

[2018-04-25 21:30:14,526] INFO Step 0: Get all known binlogs from MySQL (io.debezium.connector.mysql.MySqlConnectorTask:310)
[2018-04-25 21:30:14,536] INFO Connector requires binlog file 'mysql-bin-changelog.002640', but MySQL only has mysql-bin-changelog.002663, mysql-bin-changelog.002664, mysql-bin-changelog.002665 (io.debezium.connector.mysql.MySqlConnectorTask:323)
[2018-04-25 21:30:14,536] INFO MySQL has the binlog file 'mysql-bin-changelog.002640' required by the connector (io.debezium.connector.mysql.MySqlConnectorTask:325)
[2018-04-25 21:30:14,536] INFO Stopping MySQL connector task (io.debezium.connector.mysql.MySqlConnectorTask:239)
[2018-04-25 21:30:14,536] INFO WorkerSourceTask{id=swiggy-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-04-25 21:30:14,536] INFO WorkerSourceTask{id=swiggy-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-04-25 21:30:14,536] ERROR WorkerSourceTask{id=swiggy-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at binlog file 'mysql-bin-changelog.002640', pos=470, skipping 4 events plus 0 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:117)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:45)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:164)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)


当我进入数据库并检查MySQL中的binlog时

mysql> show binary logs;
+----------------------------+-----------+
| Log_name                   | File_size |
+----------------------------+-----------+
| mysql-bin-changelog.002664 |       479 |
| mysql-bin-changelog.002665 |       120 |
+----------------------------+-----------+


mysql> show binlog events;
+----------------------------+-----+-------------+------------+-------------+---------------------------------------------------------------------------------------------------------------------------------+
| Log_name                   | Pos | Event_type  | Server_id  | End_log_pos | Info                                                                                                                            |
+----------------------------+-----+-------------+------------+-------------+---------------------------------------------------------------------------------------------------------------------------------+
| mysql-bin-changelog.002664 |   4 | Format_desc | 1550192458 |         120 | Server ver: 5.6.39-log, Binlog ver: 4                                                                                           |
| mysql-bin-changelog.002664 | 120 | Query       | 1550192458 |         201 | BEGIN                                                                                                                           |
| mysql-bin-changelog.002664 | 201 | Query       | 1550192458 |         391 | use `mysql`; INSERT INTO mysql.rds_heartbeat2(id, value) values (1,1524671965007) ON DUPLICATE KEY UPDATE value = 1524671965007 |
| mysql-bin-changelog.002664 | 391 | Xid         | 1550192458 |         422 | COMMIT /* xid=308462 */                                                                                                         |
| mysql-bin-changelog.002664 | 422 | Rotate      | 1550192458 |         479 | mysql-bin-changelog.002665;pos=4                                                                                                |
+----------------------------+-----+-------------+------------+-------------+---------------------------------------------------------------------------------------------------------------------------------+


题:


为什么Debezium闲置?为什么它在002640文件之后没有从MySQL中读取文件?任何服务都未使用此功能。因此,不可能出现Debezium可以读取之前发生过多写入的情况。
当没有活动发生时,为什么Amazon MySQL RDS删除binlog文件?这是一个测试数据库,只有我在其中插入记录。因此,这里没有外部应用程序活动发生。
有没有一种方法可以恢复Debezium连接器并从MySQL当前可用的时间日志开始处理记录? (如果我可以丢失那些未读的记录,则可以)。
我尝试重新启动作业,删除并添加连接器,但最终总是遇到相同的错误。唯一用于恢复活动的解决方案


删除Kafka Connect的offet主题。
再次删除并添加debezium连接器。
我想要一种不同的方法,因为在生产中,我们将有很多连接器使用相同的偏移量主题。因此删除将是不可能的。

最佳答案

请查看heartbeat.interval.ms config属性-这应避免在流量较高的环境中由Debezium监视的流量较低表的情况。在这种情况下,可能会发生二进制日志被刷新但当前二进制日志坐标未记录在offsets主题中的情况。

关于简历-您可以通过修改偏移量主题从情况中恢复。在这里,您需要为服务器上可用的插件和binlog坐标插入偏移记录。有一个Kafka KIP可以帮助您解决此问题的工具。现在,您需要手动进行操作。

09-27 15:26