我正在尝试使用Apache Flink ML软件包的StochasticOutlierSelection模型。

我不知道如何将其与Kafka用作数据源,我知道它需要一个DataSet而不是DataStream,但是我似乎无法通过窗口将Kafka DataStream变成DataSet。

有没有一种方法可以将流视为一系列小型DataSet。例如,是否有办法说出流中与模式匹配的每10个元素(通过元素唯一ID滑动窗口)将它们视为固定大小的DataSet并检测此固定大小的数据集中的异常值?

我要创建的方案是:

数据源-> Kafka主题1-> Flink预处理-> Kafka主题2->按ID进行Flink分组->组上的异常值检测

我已经可以进行预处理,并且希望Flink能够满足我的要求?

最佳答案

我猜您可以创建一个基于计数的全局窗口,并使用ExecutionEnvironment获取数据集。类似以下内容可能会起作用(getResult将返回一个DataSet):


      stream.
      keyBy(...).
      window(GlobalWindows.create).
      trigger(CountTrigger.of(10)).
      aggregate(new MyAggregator()).
      ...

    class MyAggregator extends AggregateFunction[..., ..., ...] {

      var valueList: List[LabeledVector] = List[LabeledVector]()

      override def createAccumulator(): MyAggregator = new MyAggregator()
      override def add(value: .., accumulator: MyAggregator): ... = ...
      override def merge(agg1: MyAggregator, agg2: MyAggregator): ... = ...
      override def getResult(accumulator: MyAggregator): ... = {
        ExecutionEnvironment.getExecutionEnvironment.fromCollection(valueList)
      }
    }

08-25 08:09