因此,我必须检索HDFS中存储的文件的内容并对其进行某些分析。
问题是,我什至无法读取文件并将其内容写入本地文件系统中的另一个文本文件。 (我是Flink的新手,这只是一项测试,以确保我可以正确读取文件)
HDFS中的文件是纯文本文件。这是我的代码:
public class readFromHdfs {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> lines = env.readTextFile("hdfs://localhost:9000//test/testfile0.txt");
lines.writeAsText("/tmp/hdfs_file.txt");
env.execute("File read from HDFS");
}
}
运行它后,/ tmp中没有输出。
这是一个非常简单的代码,我不确定它是否有问题,或者我只是在做其他错误。正如我所说,我对Flink完全陌生
另外,该作业在Web仪表板中显示为失败。这是flink日志的内容:https://pastebin.com/rvkXPGHU
提前致谢
编辑:我通过增加任务插槽的数量解决了问题。 Web仪表板显示了一个可用的任务插槽,它并没有抱怨根本没有足够的插槽,所以我不认为是那样。
无论如何,writeAsText并没有按照我的预期工作。我从testfile0.txt中读取了内容,但是没有将它们写入hdfs_file.txt中。而是使用该名称创建目录,并在其中创建8个文本文件,其中6个完全为空。其他两个包含testfile0.txt(大部分位于1.txt中,最后一个块位于2.txt中)。
尽管这并不重要,因为文件的内容已正确存储在DataSet中,所以我可以继续分析数据。
最佳答案
它按预期方式工作-您已将完整作业的并行度(因此也将输出格式)设置为8,因此每个插槽都会创建自己的文件(如您所知,同时写入单个文件是不安全的)。如果只需要1个输出文件,则应writeAsText(...).setParalellis(1)
覆盖全局并行性属性。
如果要在本地文件系统而不是HDFS中获取输出,则应在路径中显式设置“ file://”协议,因为对于Hadoop flink,默认情况下看起来为“ hdfs://”。
关于java - Apache Flink:从HDFS读取文件,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50409946/