数据流API的批处理模式失败

数据流API的批处理模式失败

本文介绍了Apache Flink:数据流API的批处理模式失败,异常`IllegalStateException:排序输入不允许检查点。`的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

继续:Flink : Handling Keyed Streams with data older than application watermark

基于该建议,我一直在尝试在使用数据流API的同一个Flink应用程序中添加对批处理的支持。

逻辑如下:

streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.readTextFile("fileName")
.process(process function which transforms input)
.assignTimestampsAndWatermarks(WatermarkStrategy
                .<DetectionEvent>forBoundedOutOfOrderness(orderness)
                .withTimestampAssigner(
                        (SerializableTimestampAssigner<Event>) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x days))
.process(processWindowFunction);

基于公共文档,我的理解是我只需要将源代码更改为有限制的源代码即可。但是,上述处理在窗口化步骤之后的事件触发器处继续失败,出现以下异常:

java.lang.IllegalStateException: Checkpointing is not allowed with sorted inputs.
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:552)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
    at java.base/java.lang.Thread.run(Thread.java:829)

输入文件包含多个键的历史事件。对给定键的数据进行排序,但不对整个数据进行排序。我还在每个键的末尾添加了一个事件,其TIMESTAMP=MAX_WATERMARK指示键控流的结束。我也尝试了单个密钥,但处理失败,出现相同的异常。

注意:我尚未启用检查点。我还尝试过显式禁用检查点,但无济于事。

env.getCheckpointConfig().disableCheckpointing();

编辑-1

添加更多详细信息:我尝试更改并使用FileSource读取文件,但仍收到相同的异常。

environment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
"Text File")

第一个流程步骤和密钥拆分起作用。然而,在那之后它就失败了。我尝试删除窗口并添加一个简单的流程步骤,但仍然失败。没有显式接收器。最后一个进程函数只是更新数据库。

我是不是遗漏了什么?

推荐答案

只有在启用检查点时才能引发该异常。也许您可以在Flink-conf.yaml中配置检查点间隔?

这篇关于Apache Flink:数据流API的批处理模式失败,异常`IllegalStateException:排序输入不允许检查点。`的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-31 04:34