问题描述
我使用的火花读取HDFS在一个RDD从目录中的所有文件的数据和它的子目录以及。我找不到任何有效的方法来做到这一点。于是,我试着写一些定制code,如下所示:
I 'm using spark to read data of all files from HDFS in a single RDD from a directory and it's sub directories as well. I could not find any efficient method to do that. So I tried to write some customized code as shown below:
public Object fetch(String source,String sink) {
//reading data
boolean isDir=new File(source).isDirectory();
System.out.println("isDir="+isDir);
JavaRDD<String> lines;
if(isDir)
{
lines=readFiles(new File(source).listFiles(), null);
}
else
lines= sc.textFile(source);
lines.saveAsTextFile(sink);
return true;
}
public static JavaRDD<String> readFiles(File[] files,JavaRDD<String> lines) {
for (File file : files) {
if (file.isDirectory()) {
readFiles(file.listFiles(),lines); // Calls same method again.
}
else {
if(lines==null)
lines=sc.textFile(file.getPath());
else
{
JavaRDD<String> r=sc.textFile(file.getPath());
lines.union(r);
}
}
}
return lines;
}
但这不是我做的工作预期为ISDIR包含虚假说明问题,它不是一个目录。
请ü可以指导我一下有什么不对?而且有一些有效的方法来做好这项工作?
非常感谢
but this is not doing my expected job as isDir contains false telling that it's not a directory.Please can u guide me about what's wrong? and is there some efficient way to do this job?Thanks alot
推荐答案
由于火花可以阅读基础上的Hadoop作业的配置数据,您可以使用 FileInputFormat#setInputDirRecursive
方法
As spark can read data based on a Hadoop Job configuration, you can use the FileInputFormat#setInputDirRecursive
method.
JavaSparkContext context = new JavaSparkContext();
Job job;
try {
job = Job.getInstance();
FileInputFormat.setInputPaths(job, new Path("/path/to/input/directory));
FileInputFormat.setInputDirRecursive(job, true);
} catch (IOException e1) {
e1.printStackTrace();
System.exit(1);
}
JavaRDD<Text> sourceData = context.newAPIHadoopRDD(job.getConfiguration(), TextInputFormat.class, LongWritable.class, Text.class)
.values();
显然,你会最终有一个文本数据类型而不是字符串。
Obviously you will end up with a Text data type instead of a String.
这篇关于阅读递归火花的Java API从HDFS中的所有文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!