2019年1月28日,阿里云宣布开源“计算王牌”实时计算平台Blink回馈给ApacheFlink社区。官方称,计算延迟已经降到毫秒级,也就是你在浏览网页的时候,眨了一下眼睛,淘宝、天猫处理的信息已经刷新了17亿次。

技本功丨用短平快的方式告诉你:Flink-SQL的扩展实现-LMLPHP

作为一家对技术有追求、有渴望的公司,怎么少得了为Flink社区做些贡献呢?

技本功丨用短平快的方式告诉你:Flink-SQL的扩展实现-LMLPHP

夫子说

首先,本文所述均基于flink 1.5.4

我们为什么扩展Flink-SQL?

由于Flink 本身SQL语法并不提供在对接输入源和输出目的的SQL语法。数据开发在使用的过程中需要根据其提供的Api接口编写Source和 Sink, 异常繁琐,不仅需要了解FLink 各类Operator的API,还需要对各个组件的相关调用方式有了解(比如kafka,redis,mongo,hbase等),并且在需要关联到外部数据源的时候没有提供SQL相关的实现方式,因此数据开发直接使用Flink编写SQL作为实时的数据分析时需要较大的额外工作量。

我们的目的是在使用Flink-SQL的时候只需要关心做什么,而不需要关心怎么做。不需要过多的关心程序的实现,专注于业务逻辑。

接下来,我们一起来看下Flink-SQL的扩展实现吧!

01扩展了哪些flink相关sql

(1)创建源表语句

技本功丨用短平快的方式告诉你:Flink-SQL的扩展实现-LMLPHP

(2)创建输出表语句

技本功丨用短平快的方式告诉你:Flink-SQL的扩展实现-LMLPHP

(3)创建自定义函数

技本功丨用短平快的方式告诉你:Flink-SQL的扩展实现-LMLPHP

(4)维表关联

技本功丨用短平快的方式告诉你:Flink-SQL的扩展实现-LMLPHP

02各个模块是如何翻译到flink的实现

( 1 ) 如何将创建源表的sql语句转换为flink的operator;

Flink中表的都会映射到Table这个类。然后调用注册方法将Table注册到environment。

StreamTableEnvironment.registerTable(tableName, table);

当前我们只支持kafka数据源。Flink本身有读取kafka 的实现类, FlinkKafkaConsumer09,所以只需要根据指定参数实例化出该对象。并调用注册方法注册即可。

另外需要注意在flink sql经常会需要用到rowtime, proctime, 所以我们在注册表结构的时候额外添加rowtime,proctime。

当需要用到rowtime的使用需要额外指定DataStream.watermarks(assignTimestampsAndWatermarks),自定义watermark主要做两个事情:1:如何从Row中获取时间字段。 2:设定最大延迟时间。

( 2 ) 如何将创建的输出表sql语句转换为flink的operator;

Flink输出Operator的基类是OutputFormat, 我们这里继承的是RichOutputFormat, 该抽象类继承OutputFormat,额外实现了获取运行环境的方法getRuntimeContext(), 方便于我们之后自定义metric等操作。

我们以输出到mysql插件mysql-sink为例,分两部分:

  • 将create table 解析出表名称,字段信息,mysql连接信息。

该部分使用正则表达式的方式将create table 语句转换为内部的一个实现类。该类存储了表名称,字段信息,插件类型,插件连接信息。

  • 继承RichOutputFormat将数据写到对应的外部数据源。

主要是实现writeRecord方法,在mysql插件中其实就是调用jdbc 实现插入或者更新方法。

( 3) 如何将自定义函数语句转换为flink的operator;

Flink对udf提供两种类型的实现方式:

(1)继承ScalarFunction

(2)继承TableFunction

需要做的将用户提供的jar添加到URLClassLoader, 并加载指定的class (实现上述接口的类路径),然后调用TableEnvironment.registerFunction(funcName, udfFunc);即完成了udf的注册。之后即可使用改定义的udf;

( 4 ) 维表功能是如何实现的?

流计算中一个常见的需求就是为数据流补齐字段。因为数据采集端采集到的数据往往比较有限,在做数据分析之前,就要先将所需的维度信息补全,但是当前flink并未提供join外部数据源的SQL功能。

实现该功能需要注意的几个问题:

(1)维表的数据是不断变化的

在实现的时候需要支持定时更新内存中的缓存的外部数据源,比如使用LRU等策略。

(2)IO吞吐问题

如果每接收到一条数据就串行到外部数据源去获取对应的关联记录的话,网络延迟将会是系统最大的瓶颈。这里我们选择阿里贡献给flink社区的算子RichAsyncFunction。该算子使用异步的方式从外部数据源获取数据,大大减少了花费在网络请求上的时间。

(3)如何将sql 中包含的维表解析到flink operator   

为了从sql中解析出指定的维表和过滤条件, 使用正则明显不是一个合适的办法。需要匹配各种可能性。将是一个无穷无尽的过程。查看flink本身对sql的解析。它使用了calcite做为sql解析的工作。将sql解析出一个语法树,通过迭代的方式,搜索到对应的维表;然后将维表和非维表结构分开。

技本功丨用短平快的方式告诉你:Flink-SQL的扩展实现-LMLPHP

通过上述步骤可以通过SQL完成常用的从kafka源表,join外部数据源,写入到指定的外部目的结构中。

04-28 21:10