我想将“尝试”流加入到被阻止电子邮件的静态列表中,并按IP对结果进行分组,以便以后可以统计一堆相关统计信息。结果应以每10秒30分钟的滑动窗口的形式发送。以下是我尝试实现此目标的几种方法之一:

override fun performQuery(): Table {
    val query = "SELECT ip, " +
        "COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
        "COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
        "COUNT(DISTINCT id) accounts, " +
        "COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
        "COUNT(CASE WHEN blockedEmail IS NOT NULL THEN 1 END) AS blocked_accounts " +
        "FROM Attempts " +
        "LEFT JOIN LATERAL TABLE(blockedEmailsList()) AS T(blockedEmail) ON TRUE " +
        "WHERE Attempts.email <> '' AND Attempts.createdAt < CURRENT_TIMESTAMP " +
        "GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"

    return runQuery(query)
        .select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}

这使用下面的用户定义表函数,该函数已经在tableEnv中注册为blockedEmailsList:
public class BlockedEmailsList extends TableFunction<Row> {
    private Collection<String> emails;

    public BlockedEmailsList(Collection<String> emails) {
        this.emails = emails;
    }

    public Row read(String email) {
        return Row.of(email);
    }

    public void eval() {
        this.emails.forEach(email -> collect(read(email)));
    }
}

但是,它返回以下错误:
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.


如果我按照建议的方式进行操作,并将created_at转换为TIMESTAMP,则会得到以下信息:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column.

在这里,我在堆栈溢出中发现了与这些异常相关的其他问题,但是它们涉及流和时态表,它们都不能解决将流连接到静态列表的情况。

有任何想法吗?

编辑:对于我的用例,Flink项目中似乎有一个未解决的问题:https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API

因此,我也接受替代方法建议。

最佳答案

我设法实现了一种解决方法,解决了我的问题!

我没有将流式尝试与电子邮件的静态列表结合在一起,而是预先将每个尝试映射到具有添加的blockedEmail属性的新尝试。如果静态列表blockedEmails包含当前的Attempt电子邮件,则将其blockedEmail属性设置为true

DataStream<Attempt> attemptsStream = sourceApi.<Attempt>startStream().map(new MapFunction<Attempt, Attempt>() {
    @Override
    public Attempt map(Attempt attempt) throws Exception {
        if (blockedEmails.contains(attempt.getEmail())) {
            attempt.setBlockedEmail(true);
        }
        return attempt;
    }
});

静态列表blockedEmails的类型为HashSet,因此查找将为O(1)。

最后,将分组查询调整为:
override fun performQuery(): Table {
    val query = "SELECT ip, " +
        "COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
        "COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
        "COUNT(DISTINCT id) accounts, " +
        "COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
        "COUNT(CASE WHEN blockedEmail IS true THEN 1 END) AS blocked_accounts " +
        "FROM Attempts " +
        "WHERE Attempts.email <> '' " +
        "GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"

    return runQuery(query)
        .select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}

到目前为止,流和静态列表之间的联接问题似乎尚未解决,但就我而言,上述变通办法解决了它。

07-26 02:53