一、背景

在大数据领域,初始阶段业务数据通常被存储于关系型数据库,如MySQL。然而,为满足日常分析和报表等需求,大数据平台采用多种同步方式,以适应这些业务数据的不同存储需求。这些同步存储方式包括离线仓库和实时仓库等,选择取决于业务需求和数据特性。

一项常见需求是,业务使用人员需要大数据分析平台中实时查看业务表数据,示例如下:

  1. [Mysql] 业务数据 - 用户表全量数据:
  1. [Mysql] 2023-06-02 业务数据新增了一名用户,且更改了tom的手机号,此时表数据如下:
  1. [大数据平台] 2023-06-02日业务人员在大数据平台中查看用户表实时数据,期望数据和Mysql业务数据一致,如下:

根据上述需求,我们可以得出需要构建实时表以满足业务数据的实时分析需求。

二、技术架构

为了实现上述需求,我们可以利用实时同步任务将业务数据实时同步至下游的 MPP(Massively Parallel Processing)库,从而构建实时表。结合市场上常见的技术组件,本文选择了实时引擎 FlinkCDC 和 Doris(MPP)库作为实时同步技术架构。整体架构如下:

Flink实时数仓同步:实时表实战详解-LMLPHP

三、实现方式

FlinkCDC 提供了三种实现方式,具体如下:

  1. Flink run jar 模式: 这种模式适用于处理复杂的流数据。当使用简单的 Flink SQL 无法满足复杂业务需求时(例如拉链表等),可以通过编写自定义逻辑的方式,将其打包成 Jar 包并运行。以下是一个示例:
// 示例代码
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    // 配置数据源和处理逻辑
    ...
    
    // 实时任务启动
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
  1. sql脚本模式: bin/sql-client -f file ,这种模式适用于简单的流水任务,例如实时表同步等简单的 ETL 任务。你可以通过编写 SQL 文件并使用 Flink SQL 客户端执行,而无需编写额外的 Java 代码。以下是一个示例:
# 示例 mysql2doris SQL 文件
set 'execution.checkpointing.interval'='30000';

create table mysql_user( 
# ...
) WITH ( 
# ...
);

create table doris_user( 
# ...
) WITH ( 
# ...
);

insert into doris_user select * from mysql_user;

执行如下:

$> bin/sql-client.sh --file /usr/local/flinksql/mysql2doris
  1. FlinkCDC Pipeline: 这是 FlinkCDC 3.0 版本引入的全新功能,旨在通过简单的配置即可实现数据同步,无需编写复杂的 Flink SQL。缺点是需要使用 Flink 版本 1.16 或更高版本。以下是一个示例:
# 示例配置文件
source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: doris
  name: Doris Sink
  fenodes: 127.0.0.1:8030
  username: root
  password: pass

pipeline:
   name: MySQL to Doris Pipeline
   parallelism: 4

执行如下:

$> bin/flink-cdc.sh mysql-to-doris.yaml

这三种方式各有优劣,可以根据具体需求和场景选择合适的实现方式。考虑到前几篇 Flink 实时数仓同步相关博客都采用了 Jar 包形式,为了给读者带来不同的体验,本文采用 sql脚本模式 模式来实现背景需求。

四、sql脚本模式 + 实时表实现

4.1、实时表设计

背景需求需要实时查看业务表数据,因此在Doris中设计表结构时采用了Unique数据模型。建表语句如下:

CREATE TABLE `example_user_real`
(
    `id` INT NOT NULL COMMENT '用户id',
    `name` STRING NULL COMMENT '用户昵称',
    `phone` STRING NULL COMMENT '手机号',
    `gender` CHAR(5) NULL COMMENT '用户性别',
    `create_time` DATETIMEV2(0) NULL COMMENT '用户注册时间',
    `update_time` DATETIMEV2(0) NULL COMMENT '用户更新时间'
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT '用户实时表'
DISTRIBUTED BY HASH(id) BUCKETS AUTO;

4.2、实时同步逻辑

  1. 首先,由于实时流水表同步使用Flink-cdc读取关系型数据库,flink-cdc提供了四种模式: “initial”,“earliest-offset”,“latest-offset”,“specific-offset” 和 “timestamp”。本文使用的Flink-connector-mysq是2.3版本,这里简单介绍一下这四种模式:

    • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
    • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
    • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
    • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
    • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
  2. 本文采用initial模式同步任务

  3. 编写mysql2doris SQL文件,这里需要注意的是类型转换:由于 mysql2doris 是 Flink SQL 文件,故需要将 mysql type -> flink type 以及 doris type -> flink type,示例如下:

set 'execution.checkpointing.interval'='30000';
set 'state.checkpoints.dir'='file:///home/finloan/flink-1.16.1/checkpoint/mysql2doris';

create table mysql_user(
`id` INT,
`name` STRING,
`phone` STRING,
`gender` CHAR(5),
`create_time` TIMESTAMP(0),
`update_time` TIMESTAMP(0),
PRIMARY KEY(id) NOT ENFORCED
) WITH ( 
'connector'='mysql-cdc',
'hostname'='10.185.163.177',
'port' = '80',
'username'='rouser',
'password'='123456',
'database-name' = 'database',
'table-name'='user'
);

create table doris_user(
`id` INT,
`name` STRING,
`phone` STRING,
`gender` STRING,
`create_time` TIMESTAMP(0),
`update_time` TIMESTAMP(0)
) WITH ( 
'password'='password',
'connector'='doris',
'fenodes'='11.113.208.103:8030',
'table.identifier'='database.user',
'sink.label-prefix'='唯一任务标识,每次启动都要唯一',
'username'='username' 
);

insert into doris_user select * from mysql_user;

  1. 执行命令如下:此时任务已经提交到flink 集群,本文中使用的是Flink-Cluster 模式而非yarn模式
$> ./sql-client.sh -f  ~/mysql2doris

Flink SQL> [INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5c683fba8567e65509870a6db4e99fa5
  1. 登录flinkUi界面查看任务,如下所示:

Flink实时数仓同步:实时表实战详解-LMLPHP

  1. 此时Doris 数据如下:
  1. [Mysql]业务数据2023-06-02日新增了一名tony用户,且更改了tom的手机号,此时表数据如下:
  1. 此时Doris 数据如下:

五、总结

本文介绍了实时数仓同步的实操案例,通过 FlinkCDC 和 Doris 实现了实时表的构建和数据同步。在实现过程中,尤其压迫注意数据类型的转换问题,以确保不同数据存储之间的兼容性。

此外要根据具体需求和场景,选择合适的实现方式,本文选择了 sql-client --f file 模式来实现实时表需求,旨在为读者提供了不同的实践体验。

六、相关资料

03-08 12:45