我想将“尝试”流加入到被阻止电子邮件的静态列表中,并按IP对结果进行分组,以便以后可以统计一堆相关统计信息。结果应以每10秒30分钟的滑动窗口的形式发送。以下是我尝试实现此目标的几种方法之一:
override fun performQuery(): Table {
val query = "SELECT ip, " +
"COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
"COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
"COUNT(DISTINCT id) accounts, " +
"COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
"COUNT(CASE WHEN blockedEmail IS NOT NULL THEN 1 END) AS blocked_accounts " +
"FROM Attempts " +
"LEFT JOIN LATERAL TABLE(blockedEmailsList()) AS T(blockedEmail) ON TRUE " +
"WHERE Attempts.email <> '' AND Attempts.createdAt < CURRENT_TIMESTAMP " +
"GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"
return runQuery(query)
.select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}
这使用下面的用户定义表函数,该函数已经在
tableEnv
中注册为blockedEmailsList
:public class BlockedEmailsList extends TableFunction<Row> {
private Collection<String> emails;
public BlockedEmailsList(Collection<String> emails) {
this.emails = emails;
}
public Row read(String email) {
return Row.of(email);
}
public void eval() {
this.emails.forEach(email -> collect(read(email)));
}
}
但是,它返回以下错误:
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
如果我按照建议的方式进行操作,并将
created_at
转换为TIMESTAMP
,则会得到以下信息:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column.
在这里,我在堆栈溢出中发现了与这些异常相关的其他问题,但是它们涉及流和时态表,它们都不能解决将流连接到静态列表的情况。
有任何想法吗?
编辑:对于我的用例,Flink项目中似乎有一个未解决的问题:https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
因此,我也接受替代方法建议。
最佳答案
我设法实现了一种解决方法,解决了我的问题!
我没有将流式尝试与电子邮件的静态列表结合在一起,而是预先将每个尝试映射到具有添加的blockedEmail
属性的新尝试。如果静态列表blockedEmails
包含当前的Attempt电子邮件,则将其blockedEmail
属性设置为true
。
DataStream<Attempt> attemptsStream = sourceApi.<Attempt>startStream().map(new MapFunction<Attempt, Attempt>() {
@Override
public Attempt map(Attempt attempt) throws Exception {
if (blockedEmails.contains(attempt.getEmail())) {
attempt.setBlockedEmail(true);
}
return attempt;
}
});
静态列表
blockedEmails
的类型为HashSet
,因此查找将为O(1)。最后,将分组查询调整为:
override fun performQuery(): Table {
val query = "SELECT ip, " +
"COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
"COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
"COUNT(DISTINCT id) accounts, " +
"COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
"COUNT(CASE WHEN blockedEmail IS true THEN 1 END) AS blocked_accounts " +
"FROM Attempts " +
"WHERE Attempts.email <> '' " +
"GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"
return runQuery(query)
.select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}
到目前为止,流和静态列表之间的联接问题似乎尚未解决,但就我而言,上述变通办法解决了它。