一. flink 主键声明语法
主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。
有效性检查
注意: 在 CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。
sql声明语法:
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
...
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
...
联合主键声明
create table t_sink_01 (
f1 varchar,
f2 varchar,
f3 int,
f4 timestamp(3),
f5 varchar,
primary key(f1,f2) NOT ENFORCED -- 主键声明,字段之间逗号分隔
)
with(
..
) ;
二. 物理表创建联合主键表
CREATE TABLE test003(
id INT(10),
name VARCHAR(25),
age int(10),
PRIMARY KEY(id,name));
desc test003
Field|Type |Null|Key|Default|Extra|
-----+-----------+----+---+-------+-----+
id |int |NO |PRI| | |
name |varchar(25)|NO |PRI| | |
age |int |YES | | | |
三. flink sql使用
CREATE TABLE source
( `id` int,
`username` varchar,
`age` int
) WITH (
'connector' = 'binlog-x'
,'username' = 'root'
,'password' = '11111111'
,'cat' = 'insert,delete,update'
,'url' = 'jdbc:mysql://10.17.31.234:3306/360test'
,'host' = '10.17.31.234'
,'port' = '3306'
-- 什么都不加:最新位置消费
-- 加文件名,从此文件开头消费
,'journal-name' = 'binlog.000194'
-- ,'timestamp'='169944781200'
,'table' = '360test.dimension_table'
,'timestamp-format.standard' = 'SQL'
);
CREATE TABLE sink
( `id` int,
`name` varchar,
`age` int,
PRIMARY KEY (id,name) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://localhost:3306/360test',
'table-name' = 'test003',
'username' = 'root',
'password' = '11111111',
'sink.buffer-flush.max-rows' = '1024', -- 批量写数据条数,默认:1024
'sink.buffer-flush.interval' = '10000', -- 批量写时间间隔,默认:10000毫秒
-- insert时的选项,覆盖或者忽略。
-- 声明了主键时,设置all-replace为true,全部更新覆盖,
-- 或者是忽略,即来的新数据不插入?
'sink.all-replace' = 'true', -- 解释如下(其他rdb数据库类似):默认:false。定义了PRIMARY KEY才有效,否则是追加语句
-- sink.all-replace = 'true' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=VALUES(`mid`), `mbb`=VALUES(`mbb`), `sid`=VALUES(`sid`), `sbb`=VALUES(`sbb`) 。会将所有的数据都替换。
-- sink.all-replace = 'false' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=IFNULL(VALUES(`mid`),`mid`), `mbb`=IFNULL(VALUES(`mbb`),`mbb`), `sid`=IFNULL(VALUES(`sid`),`sid`), `sbb`=IFNULL(VALUES(`sbb`),`sbb`) 。如果新值为null,数据库中的旧值不为null,则不会覆盖。
-- 新增写入选项:默认会判断,当声明了key则是update
'sink.parallelism' = '1' -- 写入结果的并行度,默认:null
);
insert into sink select id,username as name,age as age from source;