kafka将删除事件从MySQL流式传输到PostgreSQL

kafka将删除事件从MySQL流式传输到PostgreSQL

本文介绍了通过Apache-kafka将删除事件从MySQL流式传输到PostgreSQL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Apache Kafka.将事件从MySQL传输到PostgreSQL尽管插入和更新工作正常,但我不知道如何从MySQL删除记录并将该事件传输到.

I am trying to stream events from MySQL to PostgreSQL using Apache Kafka. Although insertions and updates work fine, I can't figure out how to delete a record from MySQL and stream this event to PostgreSQL.

假定以下拓扑:

               +-------------+
               |             |
               |    MySQL    |
               |             |
               +------+------+
                      |
                      |
                      |
      +---------------v------------------+
      |                                  |
      |           Kafka Connect          |
      |  (Debezium, JDBC connectors)     |
      |                                  |
      +---------------+------------------+
                      |
                      |
                      |
                      |
              +-------v--------+
              |                |
              |   PostgreSQL   |
              |                |
              +----------------+

我正在使用以下docker镜像;

I am using the following docker images;

  1. Apache-Zookeper
  2. Apache-Kafka
  3. Debezium/JDBC连接器
  1. Apache-Zookeper
  2. Apache-Kafka
  3. Debezium/JDBC connectors

然后

# Start the application
export DEBEZIUM_VERSION=0.6
docker-compose up

# Start PostgreSQL connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json

# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

这是MySQL数据库的内容;

Here is the content of MySQL database;

docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | [email protected] |
| 1002 | George     | Bailey    | [email protected]    |
| 1003 | Edward     | Walker    | [email protected]         |
| 1004 | Anne       | Kretchmar | [email protected]    |
+------+------------+-----------+-----------------------+

我们可以验证PostgresSQL的内容是否相同;

And we can verify that the content of PostgresSQL is identical;

docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | [email protected]
 Bailey    | 1002 | George     | [email protected]
 Walker    | 1003 | Edward     | [email protected]
 Kretchmar | 1004 | Anne       | [email protected]
(4 rows)

假设我要使用id=1004从MySQL数据库中删除记录;

Assume that I want to delete the record with id=1004 from MySQL database;

docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'
mysql> delete from customers where id = 1004;


docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | [email protected] |
| 1002 | George     | Bailey    | [email protected]    |
| 1003 | Edward     | Walker    | [email protected]         |
+------+------------+-----------+-----------------------+

尽管记录已从MySQL中删除,但该条目仍显示在PostgresSQL中

Although the record is deleted from MySQL, the entry still appears in PostgresSQL

docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'

 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | [email protected]
 Bailey    | 1002 | George     | [email protected]
 Walker    | 1003 | Edward     | [email protected]
 Kretchmar | 1004 | Anne       | [email protected]
(4 rows)

我知道支持软删除,但是是否也可以从PostgresSQL中完全删除该特定条目(通过通过Apache-Kafka从MySQL发送del事件)?

I understand that soft deletes are supported however, is it possible to completely delete that particular entry from PostgresSQL as well (by streaming the del event from MySQL via Apache-Kafka)?

这是source.json文件的内容

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "transforms": "route",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3"
    }
}

这是jdbc-sink.json文件的内容

{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "customers",
        "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"
    }
}

我还尝试设置"pk.mode": "record_key""delete.enabled": "true"(错误修复建议),但此修改似乎无效.

I have also tried to set "pk.mode": "record_key" and "delete.enabled": "true" (bug fix suggestion) but this modification doesn't seem to work.

推荐答案

Confluent JDBC接收器连接器当前不支持删除.有一个待处理的拉取请求(您已链接到它),但是尚未合并.

Deletes are currently not supported by the Confluent JDBC sink connector. There's a pending pull request (you already linked to it), but this hasn't been merged yet.

目前,您可以自己基于该分支构建JDBC接收器连接器,也可以创建一个简单的自定义接收器连接器,该连接器通过在目标数据库上执行相应的DELETE语句来处理逻辑删除事件.

For the time being, you could either build the JDBC sink connector based on that branch yourself, or you create a simple custom sink connector which just handles tombstone events by executing a corresponding DELETE statement on the target database.

这篇关于通过Apache-kafka将删除事件从MySQL流式传输到PostgreSQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-04 12:30