Debezium包

想在代码中自定义监控Oralce,以及其它很多数据库的变更,可以通过Debezium的API。

在我们的项目中,需要引入debezium-api、debezium-embeded这两个包。

比如maven项目:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>

另外就是oralce的专用包debezium-connector-oralce:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-oracle</artifactId>
    <version>${version.debezium}</version>
</dependency>

其中version.debezium需要自己做好选择,因为不同的版本,需要的JDK并不一样。

目前,最新的debezium稳定版本是3.0,需要的Java版本至少是17。更早的稳定版本是2.7,需要的Java版本是11。具体信息可以查看这里这张表

为了更久的支持,推荐选用更新的版本。

核心回调函数

使用Debezium的API监控数据库变更,非常简单,核心是使用DebeziumEngine。

DebeziumEngine是一个Runable,使用构建者模式的Builder类来完成构造之后,把它提交给一个Executor即可。

这个DebeziumEngine支持两种回调函数,一个是```

java.util.function.Consumer<R>

,即每次进入回调函数,处理一条记录。

还有一个回调函数,是

 void handleBatch(List<R> records, RecordCommitter<R> committer) throws InterruptedException;

为了提高处理性能,最好使用这个handleBatch的“批处理”回调函数。

需要注意的是,这俩回调函数没有返回值。Debezium的处理方法是,只要回调函数抛出异常,就整个监听结束。

这个函数的第一个参数records,是一个R的列表,即如果使用Consumer<R>的话那个记录,使用这个handleBatch的话则是记录列表。

第二个参数是一个RecordCommiter,这个是用来控制监控进度的,它有两个主要的方法:

  • markProcessed(R record); 用来标记一条记录已经处理;
  • makrBatchFinished(); 用来表示整个一批记录已经处理。

EmbededEngine的构造

现在最新的DebeziumEngine叫做AsyncDebeziumEngine,相比过去的老Engine,它多了很多功能,比如可以支持并发处理。

构造EmbededEngine的最简单示例为:

DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine
        .create(KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class),
                "io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory")
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        }).build()
    ) {

    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.execute(engine);

其中,notifying就是注册的回调函数,这里示例用了一个匿名函数,还可以写一个handleBatch的,同样是使用notifying构建进去。

而.create()传进去的KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class)则会在回调函数中,把变更记录的Key、Value还有Header用Json格式输出。

这里支持这几种格式,可以灵活选用:

  • Connect.class - 输出是Kafka连接器的SourceRecord
  • Json.class - 输出是一对JSON字符串
  • JsonByteArray.class - 输出是使用UTF-8编码为byte数组的JSON
  • Avro.class - 参见Avro Serialization
  • CloudEvents.class - 参见Cloud Events

Debezium参数配置

传入进去的Properties,有很多参数,下面通过一个示例来举例说明:

        properties = new Properties();

        // 任务名称
        properties.setProperty("name", "test1");

        // 数据库类型,这里是Oracle
        properties.setProperty("connector.class", "io.debezium.connector.oracle.OracleConnector");

        // 数据库地址、端口、用户名、密码
        properties.setProperty("database.hostname", "192.168.1.2");
        properties.setProperty("database.port", String.valueOf(1521));
        properties.setProperty("database.user", "usr1");
        properties.setProperty("database.password", "psw1");
        
        // 数据库的DBId
        properties.setProperty("database.dbid", String.valueOf(1111111111));
        // 数据库名
        properties.setProperty("database.dbname", "orcl");

        // 快照模式
        properties.setProperty("snapshot.mode", "initial");

        // 主题
        properties.setProperty("topic.prefix", "test1");

        properties.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
                 properties.setProperty("schema.history.internal.file.filename", "/data/history.dat");

        // 每5秒刷一次偏移量
        properties.setProperty("offset.flush.interval.ms", "5000");
        properties.setProperty("offset.storage.file.filename", "/data/offsets.log");

        // 监控的数据库列表
        properties.setProperty("database.include.list", "orcl");
        // 监控的schema列表
        properties.setProperty("schema.include.list", "usr1");

        // 监控的数据表列表
        properties.setProperty("table.include.list", "usr1.userinfo,usr1.permission");

其中,监控的schema默认是用户名,而监控的数据表则是schema.table这种格式。

在使用过程中,如果遇到内存不够的问题,还可以调整这几个参数:

        // 批量大小设置,这里三个数值是默认值。
        properties.setProperty("log.mining.batch.size.min", String.valueOf(1000));
        properties.setProperty("log.mining.batch.size.max", String.valueOf(100000));
        properties.setProperty("log.mining.batch.size.default", String.valueOf(20000));

Oracle设置

要使用Debezium监控数据库变更,还需要Oracle的服务

  1. Oracle开启补充日志
  2. 打开归档日志模式

开启补充日志比较简单,以下一条命令就可以:

alter database add supplemental log data;

打开归档模式也很简单,但是需要注意需要用户登录为sysdba。

以下命令可以查询是否打开了归档模式:

select log_mode from v$database;

如果显示不是归档日志模式,以下命令可以打开。过程为先连接到sysdba,之后关闭数据库实例,之后用mount模式启动实例(即不打开数据库),然后更改为归档日志模式,然后再打开数据库。

conn system/oracle as sysdba; 
shutdown immediate; 
startup mount; 
alter database archivelog;
alter database open;
11-30 16:26