我有一个具有以下结构的Flink表:
Id1, Id2, myTimestamp, value
行时间基于
myTimestamp
。我的以下处理效果很好:
Table processed = tableEnv.sqlQuery("SELECT " +
"Id1, " +
"MAX(myTimestamp) as myTimestamp, " +
"SUM(value) as value " +
"FROM MyTable " +
"GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");
我想改编先前的代码,例如对于每个窗口,每个
Id2
仅使用最新记录。因此,尽管我可以按照以下方式更改代码:Table processed = tableEnv.sqlQuery("SELECT " +
"Id1, " +
"MAX(myTimestamp) as myTimestamp, " +
"SUM(value) as value " +
"FROM MyTable, " +
"(SELECT Id2, MAX(myTimestamp) as latestTimestamp FROM MyTable GROUP BY Id2) as RecordsLatest" +
"WHERE MyTable.Id2 = RecordsLatest.Id2 AND MyTable.myTimestamp = RecordsLatest.myTimestamp" +
"GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");
但是当我这样做时,出现以下错误:
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
看起来Flink不能“理解”我要加入的两个表是同一表。
我该怎么办?
最佳答案
有几个原因使您的查询无法正常工作。
SELECT
Id1, MAX(myTimestamp) as myTimestamp, SUM(value) as value
FROM
MyTable,
(SELECT Id2, MAX(myTimestamp) as latestTimestamp
FROM MyTable
GROUP BY Id2
) as RecordsLatest
WHERE
MyTable.Id2 = RecordsLatest.Id2
AND MyTable.myTimestamp = RecordsLatest.myTimestamp
GROUP BY
Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)
有些是由于Flink的限制,有些则是更基本的。
latestTimestamp
不再是rowtime属性。这是因为它是经过计算的。一旦在表达式中使用rowtime属性(包括诸如MAX
之类的聚合函数),它们就会失去其rowtime属性,而成为常规的TIMESTAMP
属性。内部查询产生一个动态表,该表更新其结果。它不是仅追加表。
Id2
的最大时间戳更改后,就需要撤消前一个结果行并插入新的结果行。由于
RecordsLatest
是更新表(而不是仅追加表),并且latestTimestamp
不是rowtime属性,因此RecordsLatest
和MyTable
的联接是“常规联接”(而不是时间窗)联接),它还会产生更新结果,而不是仅附加结果。常规联接不能产生任何行时间属性,因为不能保证输出行的顺序(这是行时间属性的先决条件,因为它们需要与水印对齐),并且结果将来可能需要删除它们。这导致您看到错误信息。外部查询的
GROUP BY
子句需要具有rowtime属性rowtime
的仅追加输入表。但是,联接的输出不是仅追加的,而是更新的,并且rowtime
属性不能是如前所述的rowtime属性。不幸的是,解决任务并不简单,但应该可行。
首先,您应该返回一个查询,该查询为每个(
Id1, Id2
)窗口返回具有最大时间戳的row的值:SELECT
Id1, Id2,
MAX(myTimestamp) AS maxT
ValOfMaxT(valX, myTimestamp) AS valXOfMaxT,
HOP_ROWTIME(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS rowtime
FROM
MyTable
GROUP BY
Id1, Id2, HOP(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND)
ValOfMaxT
函数是用户定义的聚合函数,用于标识最大时间戳记的值并返回。 rowtime
是新的rowtime属性,并且在窗口的结束时间戳记之前1ms。给定此表,我们将其称为
Temp
,您可以将下一个查询定义为:
SELECT
Id1, MAX(maxT) as myTimestamp, SUM(valXOfMaxT)
FROM Temp
GROUP BY
Id1, TUMBLE(rowtime, INTERVAL '10' SECONDS)
该查询仅在
Id1
和TUMBLE
窗口上分组。这是一个TUMBLE
窗口,因为第一个HOP
窗口已经将每个记录分为三个窗口,我们不应该再次这样做。相反,我们将第一个查询的结果分为10个第二个窗口,因为这是第一个查询中HOP
窗口的滑动长度。