我正在尝试使用Apache Flink ML软件包的StochasticOutlierSelection模型。
我不知道如何将其与Kafka用作数据源,我知道它需要一个DataSet而不是DataStream,但是我似乎无法通过窗口将Kafka DataStream变成DataSet。
有没有一种方法可以将流视为一系列小型DataSet。例如,是否有办法说出流中与模式匹配的每10个元素(通过元素唯一ID滑动窗口)将它们视为固定大小的DataSet并检测此固定大小的数据集中的异常值?
我要创建的方案是:
数据源-> Kafka主题1-> Flink预处理-> Kafka主题2->按ID进行Flink分组->组上的异常值检测
我已经可以进行预处理,并且希望Flink能够满足我的要求?
最佳答案
我猜您可以创建一个基于计数的全局窗口,并使用ExecutionEnvironment获取数据集。类似以下内容可能会起作用(getResult将返回一个DataSet):
stream.
keyBy(...).
window(GlobalWindows.create).
trigger(CountTrigger.of(10)).
aggregate(new MyAggregator()).
...
class MyAggregator extends AggregateFunction[..., ..., ...] {
var valueList: List[LabeledVector] = List[LabeledVector]()
override def createAccumulator(): MyAggregator = new MyAggregator()
override def add(value: .., accumulator: MyAggregator): ... = ...
override def merge(agg1: MyAggregator, agg2: MyAggregator): ... = ...
override def getResult(accumulator: MyAggregator): ... = {
ExecutionEnvironment.getExecutionEnvironment.fromCollection(valueList)
}
}