类型有什么作用, 类型可以提供编译期检查, 避免到运行期才报错.

类型

首先Flink中自己定义了一套类型, 有LogicalTypeDataType两个表示

LogicalType

LogicalType表示的逻辑类型, 并不涉及类型的物理表示, 会包含nullable属性.

还有两个衍生的概念 LogicalTypeFamilyLogicalTypeRoot
LogicalTypeRoot表示的LogicalType的元类型, 例如TIMSTAMP类型可以包含多种精度, 但是多种精度的TIMESTAMP(3)``TIMSTAMP(6)都属于 TIMESTAMP_WITHOUT_TIME_ZONE root类型. 一个LogicalType只属于一个LogicalTypeRoot
LogicalTypeFamily表示的是LogicalTypeRoot的归类, 一个LogicalTypeRoot可以同时属于多个LogicalTypeFamily
例如 这就表示 DECIMAL属于 PREDEFINED, NUMERIC, EXACT_NUMERIC的分类.

DECIMAL(
        LogicalTypeFamily.PREDEFINED,
        LogicalTypeFamily.NUMERIC,
        LogicalTypeFamily.EXACT_NUMERIC),

DataType

DataType就在LogicalType之上添加了类型的物理表示, 可以看到他的成员变量就是由 logicalType + conversionClass组成, conversionClass表示Flink该怎么去读写这个类型.
他主要用在和外部系统交互的时候, 比如我定义一个TIMSTAMP类型, 我可以用java.sql.Timestamp表示, 也可以用 java.time.LocalDateTime来表示, 这时候就可以通过显示的指定 conversionClass来实现, 如果不指定, 就使用LogicalType中的默认的conversionClass.

protected final LogicalType logicalType;

protected final Class<?> conversionClass;

例如在表示Row类型时, Flink内部都是表示为RowData, 并且内部的String都是表示为StringData, 那么通过LogicalTypeUtils#toInternalConversionClass就可以将相应的conversionClass转化成内部的表示. 并 DataType#bridgeTo用来修改此DataType的物理类型表示.
当然并不是随便bridge一个类型就能生效, 他会通过LogicalType supportsInputConversionsupportsOutputConversion来校验你设置的conversionClass在不在这个范围之内
比如 VarcharType所支持的输入输出类型是

String.class.getName(), byte[].class.getName(), StringData.class.getName()

主要需要校验的点就是Source/LookupJoin/Sink/UDF 这些和外部用户定义类型需要交互的地方.
在 DynamicTableSource.Context中的 createDataStructureConverter就是将外部系统中的数据转化成内部的数据, 而这里首先就需要表示这些外部数据类型的conversionClass是满足supportsInputConversion的 然后通过DataStructureConverters.getConverter(producedDataType)创建外部Object到内部类型的映射, 这样外部Java类型系统就和SQL中的RowData关联上了.
举个例子

putConverter(
        LogicalTypeRoot.VARCHAR, String.class, constructor(StringStringConverter::new));
putConverter(
        LogicalTypeRoot.VARCHAR, byte[].class, constructor(StringByteArrayConverter::new));
putConverter(LogicalTypeRoot.VARCHAR, StringData.class, identity());

这里对于VARCHAR类型, 如果你在插件端中的实际Java object类型是String.class那么就通过StringStringConverter转化成内部的StringData类型. 如果是byte[].class那通过StringByteArrayConverter来转化, 输出侧也是一样.
对于UDF呢也是如此, 不过是通过Codegen的方式来生成converter, 将用户UDF中的外部类型, 转化成内部类型. 如果用户类型是非标准SQL的类型, 也支持指定成RAW来表示成 RawValueData

RowData

RowData是Flink内部的数据类型, 可以看到上面, toInternal转化的时候就是要将外部的ROW 转成RowData, 他和SQL类型的映射关系, 这可以理解成数据类型的内部物理表示层. 内部SQL算子, Codegen代码都是面向RowData编程, 而RowData 也有多种实现有基于Object的GenericRowData, 有基于二进制表示的BinaryRowData, 也有基于列式视图的 ColumnarRowData

 * +--------------------------------+-----------------------------------------+
 * | SQL Data Types                 | Internal Data Structures                |
 * +--------------------------------+-----------------------------------------+
 * | BOOLEAN                        | boolean                                 |
 * +--------------------------------+-----------------------------------------+
 * | CHAR / VARCHAR / STRING        | {@link StringData}                      |
 * +--------------------------------+-----------------------------------------+
 * | BINARY / VARBINARY / BYTES     | byte[]                                  |
 * +--------------------------------+-----------------------------------------+
 * | DECIMAL                        | {@link DecimalData}                     |
 * +--------------------------------+-----------------------------------------+
 * | TINYINT                        | byte                                    |
 * +--------------------------------+-----------------------------------------+
 * | SMALLINT                       | short                                   |
 * +--------------------------------+-----------------------------------------+
 * | INT                            | int                                     |
 * +--------------------------------+-----------------------------------------+
 * | BIGINT                         | long                                    |
 * +--------------------------------+-----------------------------------------+
 * | FLOAT                          | float                                   |
 * +--------------------------------+-----------------------------------------+
 * | DOUBLE                         | double                                  |
 * +--------------------------------+-----------------------------------------+
 * | DATE                           | int (number of days since epoch)        |
 * +--------------------------------+-----------------------------------------+
 * | TIME                           | int (number of milliseconds of the day) |
 * +--------------------------------+-----------------------------------------+
 * | TIMESTAMP                      | {@link TimestampData}                   |
 * +--------------------------------+-----------------------------------------+
 * | TIMESTAMP WITH LOCAL TIME ZONE | {@link TimestampData}                   |
 * +--------------------------------+-----------------------------------------+
 * | INTERVAL YEAR TO MONTH         | int (number of months)                  |
 * +--------------------------------+-----------------------------------------+
 * | INTERVAL DAY TO MONTH          | long (number of milliseconds)           |
 * +--------------------------------+-----------------------------------------+
 * | ROW / structured types         | {@link RowData}                         |
 * +--------------------------------+-----------------------------------------+
 * | ARRAY                          | {@link ArrayData}                       |
 * +--------------------------------+-----------------------------------------+
 * | MAP / MULTISET                 | {@link MapData}                         |
 * +--------------------------------+-----------------------------------------+
 * | RAW                            | {@link RawValueData}                    |
 * +--------------------------------+-----------------------------------------+

TypeInformation

Flink还有一套比较早的类型系统 基于TypeInfomation, 用于兼容较早的Source/Sink 以及UDF的类型接口, 通过LegacyTypeInfoDataTypeConverter将TypeInfomation转化成新的DataType来使用.

RelDataType

以上这三种类型都是Flink中的类型体系, 最终Flink还是通过Calcite来进行sql validate, 而Calcite中的类型是RelDataType, 所以需要有这样的一个映射关系. 通过FlinkTypeFactory#createFieldTypeFromLogicalType可以将LogicalType转化成RelDataType

Validate

首先在validate阶段, calcite系统就会去推断各个字段, 函数入参, 返回值的类型, 来进行语法的校验.

表的schema类型

flink中的catalog和Calcite的CalciteSchema通过CatalogManagerCalciteSchema来打通, 将CatalogManager封装到SimpleCalciteSchema中, 所以最后的表的查找都会通过 FlinkCalciteCatalogReader#getTable路由到
CatalogManagerCalciteSchema -> CatalogCalciteSchema -> DatabaseCalciteSchema 最终是从Catalog中查找到相应的表(CatalogSchemaTable) 这个是Flink Catalog table 和Calcite Table的中间桥接.
而此时就需要将Flink Catalog table 的表类型 (DataType) 转化成 Calcite中的类型RelDataType. 在CatalogSchemaTable#getRowType中就会完成这一转化, 最终源表产出的是一个ROW struct type.

final List<String> fieldNames = schema.getColumnNames();
final List<LogicalType> fieldTypes =
        schema.getColumnDataTypes().stream()
                .map(DataType::getLogicalType)
                .map(PlannerTypeUtils::removeLegacyTypes)
                .collect(Collectors.toList());
return flinkTypeFactory.buildRelNodeRowType(fieldNames, fieldTypes);

函数定义

基于Calcite SqlFunction的定义

首先Flink从Calcite中集成了很多函数定义例如在 FlinkSqlOperatorTable

public static final SqlOperator AND = SqlStdOperatorTable.AND;
public static final SqlOperator AS = SqlStdOperatorTable.AS;
  • SqlOperandTypeInference推断入参的类型 他的入参是 SqlCallBinding, RelDataType returnType, RelDataType[] operandTypes (这个入参都是unknown, 需要实现者去填充). InferTypes内置入参类型推断

  • SqlReturnTypeInference推断函数的返回类型. 入参是SqlOperatorBindingbinding实际上就是把校验时的上下文, 如入参类型, 入参个数, 入参的常量提供给你, 方便你去做类型推断.Calcite ReturnTypes和 FlinkReturnTypes提供了很多预置的返回类型推断的实现

  • SqlOperandTypeChecker 检查入参类型 个数等等是否合法

  • 例如我写一个 SELECT A and B from myTable 测试a和b不是boolean类型. 首先在validate的过程中需要推断SELECT列表的类型, A and B 是一个 SqlOperator 首先会调用上面的SqlOperandTypeInference推导参数类型. 对于AND操作, 推导入参都是boolean
    然后会根据每一个SqlNode#deriveType 推导其返回类型. 而递归查找的过程中最后会查找到source table schema 中找到相应的字段推断出A和B分别是什么类型. 所以感觉第一步的SqlOperandTypeInference并没有什么用 ?
    最后推导出入参类型之后会通过SqlOperandTypeChecker校验入参类型是否符合预期.
    因此当A和B类型不是Boolean时, 通过Checker就会校验报错

基于BuiltInSqlFunction接口

里面所使用的接口其实还是前面 Calcite里面定义的几种推导和校验的接口. BuiltInSqlFunction的接口的主要作用是承接一些 BuiltInFunctionDefinition 不支持的定义方式. 作者希望的主要内置函数的入口还是第三种

基于BuiltInFunctionDefinition

这个就是Flink自己完全新定义了一套类型的dsl. 如 InputTypeStrategy, OutputTypeStrategy. 这个构建出来的Function并不是直接的Calcite的SqlFunction. 最终这些函数都会注册到FunctionCatalogOperatorTable中. 而默认的Calcite初始化的OperatorTable就是 FunctionCatalogOperatorTableFlinkSqlOperatorTable

return SqlOperatorTables.chain(
        new FunctionCatalogOperatorTable(
                context.getFunctionCatalog(),
                context.getCatalogManager().getDataTypeFactory(),
                typeFactory),
        FlinkSqlOperatorTable.instance());

所以查找函数的时候会首先去FunctionCatalogOperatorTable new stack中查找. 查找到的BuiltInFunctionDefinition会通过convertToSqlFunction转化成Calcite的接口. 最终转化成为BridgingSqlFunction, BridgingSqlAggFunction

基于CallExpression

  • CallExpresssion的定义包含了函数的主体 FunctionDefinition, 参数 ResolvedExpression
    • 这一套接口主要可以透传到插件端, 比如Filter下推时下推的就是ResolvedExpression , 以及Watermark表达式
    • 以及在Java scala的table api [[ImplicitExpressionOperations]]中提供dsl api
  • 最后这套CallExpression会通过一系列的转化规则FunctionDefinitionConvertRule DirectConvertRule等等转化成Calcite的RexNode, 用于在Plan阶段将这些边缘节点(用户交互界面)的Expression编译成RexNode. 里面的转化逻辑实际上和上面的三种基本映射

自定义函数

  • 注册用户自定义函数的入口是TableEnvironmentImpl#createSystemFunction 最终实例化成FunctionDefinition
  • 类型校验则是通过 ScalarFunction#getTypeInference 来完成的, 用户实际上也可以覆写这个方法来完成类型的校验, 他也提供了一个默认的实现. 这个默认的实现就是一套基于函数hint的类型推导. FunctionMappingExtractor 完成对自定义函数的分析, 如入参类型, 返回值, 函数名称等等. 这里面用户就可以通过DataTypeHint 来对参数和返回值进行标记. 完成提取后, 用户参数的校验逻辑就和前面的 BuiltInFunctionDefinition 一致了

函数实现

  • 函数实现分为两种Old stack中, 对于Calcite中定义的函数, Flink提供运行时实现基本是通过Codegen代码来生成的, 所以之前新增一个函数比较繁琐, 需要修改Codegen的逻辑, 主要涉及的地方就是 ExprCodeGenerator
  • 而针对New stack的函数, 一般只需要开发者提供一个runtime class, 通过 BridgingSqlFunctionCallGen 完成函数实现的注入.

时间属性类型

  • 时间属性主要是用作窗口时间的特殊标记, 标记是proctime或者eventime. 所以有一种特殊的类型叫做TimeIndicatorRelDataType  这里主要是在源表定义的地方会对时间属性字段做特殊处理插入相应的TimeKind.
  • 需要注意的一点是针对时间属性字段需要做一些物化处理. 在 RelTimeIndicatorConverter 完成 例如
    • 聚合参数或者group维度 中的时间属性字段
    • Sink节点下发前
    • Calc计算节点
08-05 07:50