HDFS中JAVA API的使用

 

  HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。下面记录一下使用JAVA API对HDFS中的文件进行操作的过程。

  对分HDFS中的文件操作主要涉及一下几个类:

  Configuration类:该类的对象封转了客户端或者服务器的配置。

  FileSystem类:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作。FileSystem fs = FileSystem.get(conf);通过FileSystem的静态方法get获得该对象。

  FSDataInputStream和FSDataOutputStream:这两个类是HDFS中的输入输出流。分别通过FileSystem的open方法和create方法获得。

  具体如何对文件操作清下下面例子:

HDFS中JAVA API的使用-LMLPHP
  1 package com.hdfs;
2
3 import java.io.FileInputStream;
4 import java.io.IOException;
5 import java.io.InputStream;
6
7 import org.apache.hadoop.conf.Configuration;
8 import org.apache.hadoop.fs.FSDataOutputStream;
9 import org.apache.hadoop.fs.FileStatus;
10 import org.apache.hadoop.fs.FileSystem;
11 import org.apache.hadoop.fs.Path;
12 import org.apache.hadoop.io.IOUtils;
13
14 public class HdfsTest {
15
16 //创建新文件
17 public static void createFile(String dst , byte[] contents) throws IOException{
18 Configuration conf = new Configuration();
19 FileSystem fs = FileSystem.get(conf);
20 Path dstPath = new Path(dst); //目标路径
21 //打开一个输出流
22 FSDataOutputStream outputStream = fs.create(dstPath);
23 outputStream.write(contents);
24 outputStream.close();
25 fs.close();
26 System.out.println("文件创建成功!");
27 }
28
29 //上传本地文件
30 public static void uploadFile(String src,String dst) throws IOException{
31 Configuration conf = new Configuration();
32 FileSystem fs = FileSystem.get(conf);
33 Path srcPath = new Path(src); //原路径
34 Path dstPath = new Path(dst); //目标路径
35 //调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false
36 fs.copyFromLocalFile(false,srcPath, dstPath);
37
38 //打印文件路径
39 System.out.println("Upload to "+conf.get("fs.default.name"));
40 System.out.println("------------list files------------"+"\n");
41 FileStatus [] fileStatus = fs.listStatus(dstPath);
42 for (FileStatus file : fileStatus)
43 {
44 System.out.println(file.getPath());
45 }
46 fs.close();
47 }
48
49 //文件重命名
50 public static void rename(String oldName,String newName) throws IOException{
51 Configuration conf = new Configuration();
52 FileSystem fs = FileSystem.get(conf);
53 Path oldPath = new Path(oldName);
54 Path newPath = new Path(newName);
55 boolean isok = fs.rename(oldPath, newPath);
56 if(isok){
57 System.out.println("rename ok!");
58 }else{
59 System.out.println("rename failure");
60 }
61 fs.close();
62 }
63 //删除文件
64 public static void delete(String filePath) throws IOException{
65 Configuration conf = new Configuration();
66 FileSystem fs = FileSystem.get(conf);
67 Path path = new Path(filePath);
68 boolean isok = fs.deleteOnExit(path);
69 if(isok){
70 System.out.println("delete ok!");
71 }else{
72 System.out.println("delete failure");
73 }
74 fs.close();
75 }
76
77 //创建目录
78 public static void mkdir(String path) throws IOException{
79 Configuration conf = new Configuration();
80 FileSystem fs = FileSystem.get(conf);
81 Path srcPath = new Path(path);
82 boolean isok = fs.mkdirs(srcPath);
83 if(isok){
84 System.out.println("create dir ok!");
85 }else{
86 System.out.println("create dir failure");
87 }
88 fs.close();
89 }
90
91 //读取文件的内容
92 public static void readFile(String filePath) throws IOException{
93 Configuration conf = new Configuration();
94 FileSystem fs = FileSystem.get(conf);
95 Path srcPath = new Path(filePath);
96 InputStream in = null;
97 try {
98 in = fs.open(srcPath);
99 IOUtils.copyBytes(in, System.out, 4096, false); //复制到标准输出流
100 } finally {
101 IOUtils.closeStream(in);
102 }
103 }
104
105
106 public static void main(String[] args) throws IOException {
107 //测试上传文件
108 //uploadFile("D:\\c.txt", "/user/hadoop/test/");
109 //测试创建文件
110 /*byte[] contents = "hello world 世界你好\n".getBytes();
111 createFile("/user/hadoop/test1/d.txt",contents);*/
112 //测试重命名
113 //rename("/user/hadoop/test/d.txt", "/user/hadoop/test/dd.txt");
114 //测试删除文件
115 //delete("test/dd.txt"); //使用相对路径
116 //delete("test1"); //删除目录
117 //测试新建目录
118 //mkdir("test1");
119 //测试读取文件
120 readFile("test1/d.txt");
121 }
122
123 }
05-11 09:38