Hadoop有一个抽象的文件系统概念,HDFS只是其中一个实现。Java抽象类org.apache.hadoop.fs.FileSystem定义了Hadoop中的一个文件系统接口,并且该抽象类有几个具体的实现。Hadoop对文件系统提供了很多接口,它一般使用URI方案来选取合适的文件系统实例进行交互,比如:要想列出本地文件系统根目录下的恩见,可以输入如下命令:
        % hadoop fs -ls file:///
        Hadoop是用Java编写的,通过Java API可以调用所有Hadoop文件系统的交互操作,比如:文件系统的命令解释器就是一个Java应用,它使用Java的FileSystem类来提供文件系统操作,本文会深入探索Hadoop的FileSystem类:与Hadoop的某一文件系统进行交互的API。
一、从Hadoop URL中读取数据
1、通过FileSystem API读取数据
        可以通过使用FileSystem API来读取数据,此时,需要使用FileSystem API来打开一个文件的输入流。Hadoop文件系统中通过Hadoop Path对象来代表文件(而非java.io.File对象,因为它的语义与本地文件联系联系太紧密)。你可以将一条路径视为一个Hadoop文件系统URI,比如:hdfs://localhost/user/tom/quangle.txt。
        FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例,这里是HDFS。获取FileSystem实例有两种静态工厂方法:
        public static FileSystem get(Configuration conf) throws IOException
        public static FileSystem get(URI uri, Configuration conf) throws IOException
        其中,Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取数据类路径来实现
(比如conf/core-site.xml)。第一个方法返回的是默认文件系统(在conf/core-site.xml中指定的,如果没没有指定,则使用默认的本地文件系统)。第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统。

        有了FileSystem实例之后,我们调用open()函数来获取文件的输入流:
        public FSDataInputStream open(Path f) thows IOException
        public abstract FSDataInputStream open(Path f, in bufferSize) throws IOException
        第一个方法使用默认的缓冲区大小4KB。
        将上述方法结合起来,直接使用FileSystem以标准输出格式显示Hadoop文件系统中的文件:
        public class FileSystemCat {
            public static void main(String[] args) throws Exception {
                String uri = args[0];
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create(uri), conf);
                InputStream in = null;
                try {
                    in = fs.open(new Path(uri));
                    IOutils.copyBytes(in, System.out, 4096, false);
                } finally {} 
            }
        }
        上面调用Hadoop中简洁的IOUtils类,并在finally子句中关闭数据流,同时也可以在输入流和输出流之间复制数据,copyBytes方法的最后两个参数,第一个用于设置复制的缓冲区大小,第二个用于设置复制结束后是否关闭数据流,这里选择自行关闭输入流,因此System.out不关闭输入流。执行如下命令运行程序:
        % hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
2、FSDataInputStream
        实际上,FileSystem对象中的open()方法返回的是FSDataInputStream对象,而不是标准的java.io类对象。这个类是继承了java.io.DataInputStream接口的一个特殊类,并支持随机访问,由此可以从流的任意位置读取数据,比如:
        package org.apache.hadoop.fs;
        public class FSDataInputStream extends DataInputStream
                implements Seekable, PositionedReadable {
                // implementation elided
        }
        
        Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量的查询方法:
        public interface Seekable {
            void seek(long pos) throws IOException;        //移到文件中任意一个绝对位置
            long getPos() throws IOException;                //得到相对于文件起始位置偏移量
            boolean seekToNewSource(long targetPos) throws IOException;
        }
        FSDataInputStream类也实现了PositionedReadable接口,从一个指定偏移量处读取文件的一部分:
        public interface PositionedReadable {
            public int read(long position, byte[] buffer, int offset, int length) throws IOException;
            public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;
            public void readFully(long position, byte[] buffer) throws IOException;
        }
        read()方法从文件的指定position处读取至多为length字节的数据并存入缓冲区buffer的指定偏移量offset处,返回值是实际读到的字节数,调用者需要检查这个值,它有可能小于指定的length长度。
    
二、写入数据
1、通过FileSystem API写入数据       
        FileSystem类有一系列创建文件的方法。最简单的方法是给准备创建的文件指定一个Path对象,然后返回一个用于写入数据的输出流:
        public FSDataOutputStream create(Path f) throws IOException
        上述方法有多个重载版本,允许我们指定是否需要强制覆盖已有的文件、文件备份数量、写入文件时所用缓冲区大小、文件块大小以及文件权限。
        create()方法能够为需要写入且当前不存在的文件创建父目录,尽管这样很方便,但有时并不希望这样。如果你希望不存在父目录就发生写入失败,则应该先调用exists()方法检查文件父目录是否存在。
        还有一个重载方法Progressable,用于传递回调接口,如此一来,就可以把数据写入数据节点的进度通知到你的应用。
        另一个新建文件的方法,是使用append()方法在一个已有文件末尾追加数据(还存在一些其它重载版本)。
        下面举例说明如何将本地文件复制到Hadoop文件系统:
        public class FileCopyWithProgress {
            public static void main(String[] args) throws Exception {
                String localSrc = args[0];
                String dst = args[1];
                InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create(dst), conf);
                OutputStream out = fs.create(new Path(dst), new Progressable() {
                    public void progress() {
                        System.out.print(".");
                    }
                });
                IOUtils.copyBytes(in, out, 4096, true);
            }
        }
        执行如下命令运行程序:
        % hadoop FileCopyWithProgress input/docs/1400-8.txt hdfs://localhost/user/tom/1400-8.txt
2、FSDataOutputStream对象
        FileSystem实例的create()方法返回FSDataOutputStream对象,与FSDataInputStream类相似,它也有一个查询文件当前位置的方法:
        package org.apache.hadoop.fs;
        public class FSDataOutputStream extends DataOutputStream implements Syncable {
            public long getPos() throws IOException {
                // implementation elided
            }
            // implementation elided
        }        
        但与FSDataInputStream类不同的是:FSDataOutputStream类不允许在文件中定位。这是因为HDFS只允许对一个已打开的文件顺序写入,或在现有文件的末尾追加数据。
3、目录
        FileSystem实例提供了创建目录的方法:
        public boolean mkdirs(Path f) throws IOException
        这个方法可以一次性新建所有必要但还没有的父目录,就像java.io.File类的mkdirs()方法。如果目录(以及所有父目录)都已经创建成功,则返回true。
        通常,你不需要显示创建一个目录,因为调用create()方法写入文件时会自动创建父目录。

三、查询文件系统
1、文件元数据
        任何文件系统的一个重要特征都是提供其目录结构浏览和检索它所存文件和目录相关信息的功能。
        FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、备份、修改时间、所有者以及权限信息。
        FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象。
2、列出文件
        FileSystem提供listStatus()方法用于列出目录的内容,方法接口如下:
        public FileStatus[] listStatus(Path f) throws IOException;
        public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException
        public FileStatus[] listStatus(Path[] files) throws IOException
        public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException
        当传入的参数是一个文件时,它会简单转变成以数组方式返回长度为1的FileStatus对象。当传入参数是一个目录时,则返回0或多个FileStatus对象,表示此目录中包含的文件和目录。
        一种重载方法是允许使用PathFilter来限制匹配的文件和目录。最后,如果指定一组路径,其执行结果相当于依次轮流传递每条路径并对其调用listStatus()方法,再将FileStatus对象数组累积存入同一数组中,但该方法更为方便。这从文件系统树的不同分支构建输入文件列表时,这是很有用的。下面举例介绍这种方法:
        public class ListStatus {
            public static void main(String[] args) throws Exception {
                String uri = args[0];
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create(uri), conf);
                Path[] paths = new Path[args.length];
                for (int i=0; i                    paths[i] = new Path(args[i]);
                }
                FileStatus[] status = fs.listStatus(paths);
                Path[] listedPaths = FileUtil.stat2Paths(status);
                for (Path p : listedPaths) {
                    System.out.println(p);
                } 
            }
        }        
        需要注意的是:FileUtil中stat2Paths()方法的使用,其将一个FileStatus对象数组转换为Path对象数组,执行如下命令运行程序:
        % hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom
3、文件模式
        在单个操作中处理一批文件,这是一个常见要求,比如:处理日志的MapReduce作业可能需要分析一个月内包含在大量目录中的日志文件,这是在一个表达式中使用通配符来匹配多个文件是比较方便的。Hadoop为执行通配提供了两个FileSystem方法:
        public FileStatus[] globStatus(Path pathPattern) throws IOException
        public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException
        globStatus()方法返回与路径相匹配的所有文件的FileStatus对象数组,并按路径排序。PathFilter命令作为可选项可以进一步对匹配进行限制。Hadoop支持的通配符与Unix bash相同。
4、PathFilter对象
        通配符模式并不总能够精确地描述我们想要访问的文件集,比如:使用通配格式排除一个特定的文件就不太可能。FileSystem中的listStatus()和globStatus()方法通过了可选的PathFilter对象,使我们能够提供编程方式控制通配符:
        package org.apache.hadoop.fs;
        public interface PathFilter {
            boolean accept(Path path);
        }
        PathFilter与java.io.FileFilter一样,是Path对象而不是File对象。

四、删除数据
        使用FileSystem的delete()方法可以永久性删除文件或目录:
        public boolean delete(Path f, boolean recursive) throws IOException
        如果f是一个文件或空目录,那么recursive的值就会被忽略。只有在recursive值为true时,一个非空目录及其内容才会被删除(否则会抛出IOException异常)。


09-18 16:00