创建具有通用返回类型的FlinkSQL

创建具有通用返回类型的FlinkSQL

本文介绍了创建具有通用返回类型的FlinkSQL UDF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想定义函数MAX_BY,该函数采用类型T的值和类型Number的排序参数,并根据排序(类型T)从窗口返回max元素.我已经尝试过

I would like to define function MAX_BY that takes value of type T and ordering parameter of type Number and returns max element from window according to ordering (of type T). I've tried

public class MaxBy<T> extends AggregateFunction<T, Tuple2<T, Number>> {

    @Override
    public T getValue(Tuple2<T, Number> tuple) {
        return tuple.f0;
    }

    @Override
    public Tuple2<T, Number> createAccumulator() {
        return Tuple2.of(null, 0L);
    }

    public void accumulate(Tuple2<T, Number> acc, T value, Number order) {
        if (order.doubleValue() > acc.f1.doubleValue()) {
            acc.f0 = value;
            acc.f1 = order;
        }
    }
}

,但是我无法使用TableEnvironment.registerFunction注册此类功能.在Flink的下面,使用TypeInformation来匹配SQL查询中的类型,并且具有这样的定义,它不能确定类型(至少我是这样认为的).我看到可以提供几个accumulate函数,但是-我认为每个重载方法的返回类型都必须相同.

but I cannot register such function using TableEnvironment.registerFunction. Underneath Flink uses TypeInformation to match types within SQL query and with such definition it cannot determine types (at least that's what I suppose). I saw that it is possible to provide several accumulate functions but still - I think return type must be same for each overloaded method.

内置的聚集函数的工作方式与我要实现的类似-MAX可以采用任意列类型并返回相同的类型.这就是为什么我认为我也应该能够做到这一点.

Built-in aggregation functions work similarly to what I want to achieve - MAX can take arbitrary column type and return the same type. That's why I suppose I should be able to do it as well.

推荐答案

不幸的是,Flink不支持具有灵活返回类型的聚合函数.对于MAX函数,内部实现独立于类型定义内部逻辑,然后为每种受支持的类型创建一个实现(参见代码).

Unfortunately, Flink doesn't support aggregation functions with flexible return types. For the MAX function, the internal implementation defines the core logic independent of the the type and then creates an implementation for every supported type (see code).

然后在内部,根据类型将MAX映射到正确的实现.

Internally, MAX is then mapped to the right implementation, depending on the type.

如果您将函数定义并注册为用户定义的聚合函数,我认为这是不可能的.

I don't think that's possible if you define and register a function as user-defined aggregation function.

这篇关于创建具有通用返回类型的FlinkSQL UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-14 04:44