文章目录
本文概述
文件系统连接器支持流
写入,是基于 Flink 的 文件系统 写入文件的。
我们可以直接编写 SQL,将流数据插入到非分区表。 如果是分区表,可以配置分区操作相关的属性。具体参考分区提交。
一. 滚动策略:sink后文件切分(暂不关注)
1. 切分分区目录下的文件
分区目录下的数据被分割到 part 文件中。每个分区对应的 sink 的 subtask 都至少会为该分区生成一个 part 文件。
该策略基于大小,和指定的文件可被打开的最大 timeout 时长,来滚动 part 文件。
根据描述默认情况下Flink采取了如上默认值的滚动策略。
todo:checkpoint 也会影响part文件的生成
对于 bulk formats 数据 (parquet、orc、avro):滚动策略与 checkpoint 间隔(pending 状态的文件会在下个 checkpoint 完成)控制了 part 文件的大小和个数。
2. 小文件合并
todo: checkpoint的间隔会影响文件产生的效率
file sink 支持文件合并,允许应用程序使用较小的 checkpoint 间隔但不产生大量小文件。
如果启用文件合并功能,会根据目标文件大小,将多个小文件合并成大文件。
在生产环境中使用文件合并功能时,需要注意:
二. 分区提交
sink动态写分区包括如下两个操作:
注意: 分区提交仅在(什么是?)动态分区插入
模式下才有效。
1. 分区提交触发器 (什么时候创建分区)
1.1. 逻辑说明
Flink 提供了两种类型分区提交触发器:
感知分区的几种情况:
- 如果想让下游只有在分区数据完整时才感知到分区,并且 job 中有 watermark 生成,
也能从分区字段的值中提取到时间
:
- 如果想让下游系统只有在数据完整时才感知到分区,但是没有 watermark,或者无法从分区字段的值中提取时间:
延迟数据的处理
:延迟的记录会被写入到已经提交的对应分区中,且会再次触发该分区的提交。
如下参数:
确定何时提交分区:这里只关注process-time trigger
下的两个参数
1.2. 举例说明
--默认值可以不配置
'sink.partition-commit.trigger'='process-time'
--当来第一条数据时(记录为时刻1),先创建hive分区文件夹,当时间超过 时刻1+1h 时,分区提交
--分区未提交时文件为.data开头的临时文件,分区提交时,会从cp中同步数据到临时文件中,并命名为正式文件。
'sink.partition-commit.delay'='1h'
2. 分区时间提取器 (由分区字段来写分区名)
2.1. 逻辑说明
时间提取器从分区字段值中提取时间。
2.2. 举例说明
-- 'year'、'month' 和 'day'三个字段组成分区
-- 可不填,'default'为默认值,即从分区字段中获取
'partition.time-extractor.kind' = 'default'
--具体动态分区名怎么由字段拼接
'partition.time-extractor.timestamp-pattern' = '$year$month$day'
--分区名格式
'partition.time-extractor.timestamp-formatter' = 'yyyyMMdd'
3. 分区提交策略 (分区创建后怎么告知下游或系统)
3.1. 逻辑说明
分区提交策略定义了提交分区时的具体操作。
public class AnalysisCommitPolicy implements PartitionCommitPolicy {
private HiveShell hiveShell;
@Override
public void commit(Context context) throws Exception {
if (hiveShell == null) {
hiveShell = createHiveShell(context.catalogName());
}
hiveShell.execute(String.format(
"ALTER TABLE %s ADD IF NOT EXISTS PARTITION (%s = '%s') location '%s'",
context.tableName(),
context.partitionKeys().get(0),
context.partitionValues().get(0),
context.partitionPath()));
hiveShell.execute(String.format(
"ANALYZE TABLE %s PARTITION (%s = '%s') COMPUTE STATISTICS FOR COLUMNS",
context.tableName(),
context.partitionKeys().get(0),
context.partitionValues().get(0)));
}
}
todo:如上通过hive语句来添加分区
3.2. 举例说明
'sink.partition-commit.policy.kind'='success-file'
'sink.partition-commit.success-file.name'='_SUCCESS_gao'
4. Sink Parallelism
在流模式和批模式下,向外部文件系统(包括 hive)写文件时的 parallelism 可以通过相应的 table 配置项指定。
当配置了跟上游的 chained operator 不一样的 parallelism 时,写文件和合并文件的算子(如果开启的话)会使用指定的 sink parallelism。
注意: 目前,当且仅当上游的 changelog 模式为 INSERT-ONLY 时,才支持配置 sink parallelism。否则,程序将会抛出异常。
三. 完整示例
1. 官网(partition-time)
以下示例展示了如何使用文件系统连接器编写流式查询语句,将数据从 Kafka 写入文件系统,然后运行批式查询语句读取数据。
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector'='filesystem',
'path'='...',
'format'='parquet',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='success-file'
);
-- 流式 sql,插入文件系统表
INSERT INTO fs_table
SELECT
user_id,
order_amount,
DATE_FORMAT(log_ts, 'yyyy-MM-dd'),
DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;
-- 批式 sql,使用分区修剪进行选择
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
如果 watermark 被定义在 TIMESTAMP_LTZ 类型的列上并且使用 partition-time
模式进行提交,sink.partition-commit.watermark-time-zone
这个属性需要设置成会话时区,否则分区提交可能会延迟若干个小时。
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
ts BIGINT, -- 以毫秒为单位的时间
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列上定义 watermark
) WITH (...);
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector'='filesystem',
'path'='...',
'format'='parquet',
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 假设用户配置的时区为 'Asia/Shanghai'
'sink.partition-commit.policy.kind'='success-file'
);
-- 流式 sql,插入文件系统表
INSERT INTO fs_table
SELECT
user_id,
order_amount,
DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
DATE_FORMAT(ts_ltz, 'HH')
FROM kafka_table;
-- 批式 sql,使用分区修剪进行选择
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
2. 实际测试(kafka->hive)
-- SET 'table.sql-dialect'='hive';
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'data_base',
'hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'
);
CREATE TABLE source_kafka (
`pv` string,
`uv` string,
`p_day_id` string
) WITH (
'connector' = 'kafka-x'
,'topic' = 'hive_kafka'
,'properties.bootstrap.servers' = 'xxx:9092'
,'properties.group.id' = 'luna_g'
,'scan.startup.mode' = 'earliest-offset'
,'json.timestamp-format.standard' = 'SQL'
,'json.ignore-parse-errors' = 'true'
,'format' = 'json'
,'scan.parallelism' = '1'
);
-- 通过sql hint来指定表的行为
-- 1. 分区名称策略
-- partition.time-extractor.timestamp-pattern'='$p_day_id' :分区数据组成
-- partition.time-extractor.timestamp-formatter' = 'yyyyMMdd' :分区格式
-- 2. 分区提交策略
-- 'sink.partition-commit.delay'='5min':分区提交延迟:分区时间 + 延迟 与 process_time做对比
--3. 通知下游策略
-- 'sink.partition-commit.policy.kind'='metastore,success-file':通知下游策略
-- 'sink.partition-commit.success-file.name'='_SUCCESS_gao' :成功文件名称
insert into
myhive.logsget.dws_thjl_pv_uv_d_xky_bak /*+ OPTIONS('partition.time-extractor.timestamp-pattern'='$p_day_id:00:00','sink.partition-commit.policy.kind'='metastore,success-file','sink.partition-commit.success-file.name'='_SUCCESS_gao111') */
select * from source_kafka;