正确设置项目后,我需要将几个.bsq文件读入/导入到我的环境中。我试图像这样使用env.readFile()方法:

DataSet<T> data = env.readFile(*insertFileInputFormatHere*, filePath);


但是我无法获得正确的FileInputFormat。由于它是抽象的,因此我无法拥有自己的实例。我应该扩展abstract class FileInputFormat并使用自己的扩展来实例化FileInputFormat吗?还是我不认识的另一种方式?

最佳答案

似乎.bsq文件是不存在Flink或Hadoop InputFormat的二进制格式(我包括Hadoop InputFormats,因为Flink还支持Hadoop IF)。

因此,您必须实现自己的InputFormat来读取文件。我建议从Flink的org.apache.flink.api.common.io.FileInputFormat扩展您自己的InputFormat。

要使用自己的输入格式,您必须像

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.createInput(new MyInputFormat());

09-11 08:48