我有一个具有以下结构的Flink表:

Id1, Id2, myTimestamp, value


行时间基于myTimestamp

我的以下处理效果很好:

Table processed = tableEnv.sqlQuery("SELECT " +
                "Id1, " +
                "MAX(myTimestamp) as myTimestamp, " +
                "SUM(value) as value " +
                "FROM MyTable " +
                "GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");


我想改编先前的代码,例如对于每个窗口,每个Id2仅使用最新记录。因此,尽管我可以按照以下方式更改代码:

Table processed = tableEnv.sqlQuery("SELECT " +
                "Id1, " +
                "MAX(myTimestamp) as myTimestamp, " +
                "SUM(value) as value " +
                "FROM MyTable, " +
                "(SELECT Id2, MAX(myTimestamp) as latestTimestamp FROM MyTable GROUP BY Id2) as RecordsLatest" +
                "WHERE  MyTable.Id2 = RecordsLatest.Id2 AND MyTable.myTimestamp = RecordsLatest.myTimestamp" +
                "GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");


但是当我这样做时,出现以下错误:

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
    at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
    at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
    at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
    at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
    at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
    at org.apache.flink.table.api.Table.insertInto(table.scala:1126)


看起来Flink不能“理解”我要加入的两个表是同一表。

我该怎么办?

最佳答案

有几个原因使您的查询无法正常工作。

SELECT
  Id1, MAX(myTimestamp) as myTimestamp, SUM(value) as value
FROM
  MyTable,
  (SELECT Id2, MAX(myTimestamp) as latestTimestamp
   FROM MyTable
   GROUP BY Id2
  ) as RecordsLatest
WHERE
  MyTable.Id2 = RecordsLatest.Id2
  AND MyTable.myTimestamp = RecordsLatest.myTimestamp
GROUP BY
  Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)


有些是由于Flink的限制,有些则是更基本的。


latestTimestamp不再是rowtime属性。这是因为它是经过计算的。一旦在表达式中使用rowtime属性(包括诸如MAX之类的聚合函数),它们就会失去其rowtime属性,而成为常规的TIMESTAMP属性。
内部查询产生一个动态表,该表更新其结果。它不是仅追加表。 Id2的最大时间戳更改后,就需要撤消前一个结果行并插入新的结果行。
由于RecordsLatest是更新表(而不是仅追加表),并且latestTimestamp不是rowtime属性,因此RecordsLatestMyTable的联接是“常规联接”(而不是时间窗)联接),它还会产生更新结果,而不是仅附加结果。常规联接不能产生任何行时间属性,因为不能保证输出行的顺序(这是行时间属性的先决条件,因为它们需要与水印对齐),并且结果将来可能需要删除它们。这导致您看到错误信息。
外部查询的GROUP BY子句需要具有rowtime属性rowtime的仅追加输入表。但是,联接的输出不是仅追加的,而是更新的,并且rowtime属性不能是如前所述的rowtime属性。


不幸的是,解决任务并不简单,但应该可行。

首先,您应该返回一个查询,该查询为每个(Id1, Id2)窗口返回具有最大时间戳的row的值:

SELECT
  Id1, Id2,
  MAX(myTimestamp) AS maxT
  ValOfMaxT(valX, myTimestamp) AS valXOfMaxT,
  HOP_ROWTIME(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS rowtime
FROM
  MyTable
GROUP BY
  Id1, Id2, HOP(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND)


ValOfMaxT函数是用户定义的聚合函数,用于标识最大时间戳记的值并返回。 rowtime是新的rowtime属性,并且在窗口的结束时间戳记之前1ms。

给定此表,我们将其称为Temp,您可以将下一个查询定义为:


SELECT
  Id1, MAX(maxT) as myTimestamp, SUM(valXOfMaxT)
FROM Temp
GROUP BY
  Id1, TUMBLE(rowtime, INTERVAL '10' SECONDS)


该查询仅在Id1TUMBLE窗口上分组。这是一个TUMBLE窗口,因为第一个HOP窗口已经将每个记录分为三个窗口,我们不应该再次这样做。相反,我们将第一个查询的结果分为10个第二个窗口,因为这是第一个查询中HOP窗口的滑动长度。

09-25 16:29