文章目录
Flink 可以基于几种不同的 时间 概念来处理数据。
本页面说明了如何在 Flink Table API & SQL 里面定义时间以及相关的操作。
一. 时间属性介绍
像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。
时间属性声明
一旦时间属性定义好,就可以像普通列一样使用,也可以在时间相关的操作中使用。
时间属性的传递和物化
注意:
二. Table api指定时间属性
Table API 程序需要在 streaming environment 中指定时间属性:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
// 或者:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
三. 处理时间的指定
处理时间是基于机器的本地时间来处理数据,它是最简单的一种时间概念,但是它不能提供确定性。它既不需要从数据里获取时间,也不需要生成 watermark。
共有三种方法可以定义处理时间。
1. 在创建表的 DDL 中定义
处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 PROCTIME() 就可以定义处理时间,函数 PROCTIME() 的返回类型是 TIMESTAMP_LTZ 。
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
2. 在 DataStream 到 Table 转换时定义
ing
3. 使用 TableSource 定义
ing
四. 事件时间的指定
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。
同样事件时间的指定也有三种方式
1. 在 DDL 中定义
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。
Flink 支持和在 TIMESTAMP(不带时区) 列和 TIMESTAMP_LTZ(带有本地时区) 列上定义事件时间。
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
当源数据中的时间戳数据表示为一个纪元 (epoch) 时间,通常是一个 long 值,例如 1618989564564,此时建议将事件时间属性定义在 TIMESTAMP_LTZ 列上:
CREATE TABLE user_actions (
user_name STRING,
data STRING,
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
2. 在 DataStream 到 Table 转换时定义
ing
3. 使用 TableSource 定义
ing
五. 小结
本文讨论了flink sql中时间属性的指定方法,其中有几点细节:
参考:
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/concepts/time_attributes/#%E5%9C%A8-ddl-%E4%B8%AD%E5%AE%9A%E4%B9%89