问题描述
现在我们在 Flink 中拥有带有花式窗口的 SQL,我试图将衰减的移动平均线引用为在未来的 Flink 版本中对于 Table API 和 SQL 的可能性".来自他们的 SQL 路线图/预览 2017-03 帖子:
Now we have SQL with fancy windowing in Flink, I'm trying to have the decaying moving average referred by "what will be possible in future Flink releases for both the Table API and SQL." from their SQL roadmap/preview 2017-03 post:
table
.window(Slide over 1.hour every 1.second as 'w)
.groupBy('productId, 'w)
.select(
'w.end,
'productId,
('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)
这是我的尝试(也受到方解石衰减示例的启发)
Here is my attempt (inspired as well by the calcite decaying example):
SELECT
lb_index one_key,
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,
SUM(Y *
EXP(
proctime -
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
))
FROM write_position
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
时间是处理时间,我们从 AppendStream 表中创建 write_position 时获得的 proctime 为:
tEnv.registerTable(
"write_position",
tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))
Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'.
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'
我已经尝试将 proctime 转换为我所知道的所有其他类型(试图达到 NUMERIC 的承诺),但我找不到如何使其工作.
我错过了什么吗?proctime 是一种您无法转换的非常特殊的系统更改号"时间吗?如果是这样,仍然必须通过某种方式将其与 HOP_START(proctime,...) 值进行比较.
推荐答案
您可以使用 timestampDiff 减去两个时间点(请参阅 docs).你这样用
You can use timestampDiff to subtract two timepoints (see the docs). You use it like this
TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)
其中 timepointunit 可以是 SECOND、MINUTE、HOUR、DAY、MONTH 或 YEAR.
where timepointunit can be SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.
我没有在处理时间方面尝试过这个,但它确实适用于事件时间字段,所以希望它会.
I haven't tried this with processing time, but it does work with event time fields, so hopefully it will.
这篇关于Flink SQL 中跳跃窗口上的指数衰减移动平均值:铸造时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!