我正在使用Hadoop 0.20.2(无法更改),并且想向我的输入路径添加过滤器。数据如下:

/path1/test_a1
/path1/test_a2
/path1/train_a1
/path1/train_a2

我只想处理所有包含Train的文件。

看一下FileInputFormat类建议使用:
 FileInputFormat.setInputPathFilter(Job job, Class<? extends PathFilter> filter)

这就是我的问题所在,因为PathFilter是一个接口(interface)-当然,我可以扩展该接口(interface),但那时我仍然没有实现。因此,我实现了接口(interface):
class TrainFilter implements PathFilter
{
   boolean accept(Path path)
   {
      return path.toString().contains("train");
   }
}

当我使用TrainFilter作为PathFilter时,代码会编译,但是当我运行它时,由于输入路径被弄乱了,因此我得到了一个异常(exception)。如果不设置过滤器,我的代码将遍历/ path1下的所有文件,但是,在设置过滤器时,它将引发错误:
InvalidInputException: Input path does not exist hdfs://localhost:9000/path1

这是我在驱动程序代码中设置的方法:
job.setMapperClass(....class);
job.setInputFormatClass(....class);
job.setMapOutputKeyClass(...class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPathFilter(job, TrainFilter.class);
FileInputFormat.addInputPath(job, new Path("/path1/"));
FileOutputFormat.setOutputPath(job, new Path("/path2/"));
job.waitForCompletion(true);

有什么建议我在这里做错了吗?

编辑:我发现了问题。对PathFilter的第一次调用始终是目录本身(/ path1),并且由于它不包含目录(“train”),因此目录本身无效,因此会引发异常。这使我想到另一个问题:如何测试任意路径是否是目录?就我所知,我需要引用FileSystem,这不是PathFilter的默认参数之一。

最佳答案

或者,您可以尝试遍历给定目录中的所有文件,并检查文件名是否以train开头。例如:

        Job job = new Job(conf, "myJob");
        List<Path> inputhPaths = new ArrayList<Path>();

        String basePath = "/user/hadoop/path";
        FileSystem fs = FileSystem.get(conf);
        FileStatus[] listStatus = fs.globStatus(new Path(basePath + "/train*"));
        for (FileStatus fstat : listStatus) {
            inputhPaths.add(fstat.getPath());
        }

        FileInputFormat.setInputPaths(job,
                (Path[]) inputhPaths.toArray(new Path[inputhPaths.size()]));

10-08 12:37