hadoop-小文件合并
package com.andy.merge;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
public class RegexAcceptFilter implements PathFilter{
private final String regex ;
public RegexAcceptFilter(String regex){
this.regex = regex ;
}
//只接受符合regex的文件
@Override
public boolean accept(Path path) {
boolean flag = path.toString().matches(regex) ;
return flag;
}
}
package com.andy.merge;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
//PathFilter是一个接口,里面只有一个方法accept(Path path)
public class RegexUncludeFilter implements PathFilter{
private final String regex ;
public RegexUncludeFilter(String regex){
this.regex = regex ;
}
//过滤 regex 格式的文件
@Override
public boolean accept(Path path) {
boolean flag = path.toString().matches(regex);
//符合得我就接受,不符合的就过滤,所以是非flag
return !flag;
}
}
package com.andy.merge;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
/**
* 小文件合并
* @author huang
*
*/
public class MegerSmallFiles {
//写入到HDFS的FileSystem对象
private static FileSystem fs = null ;
//本地文件系统的FileSystem
private static FileSystem local = null ;
//HDFS服务路径
private static final String HDFS_SERVER = "hdfs://192.168.153.111:9000" ;
//合并小文件的主要方法
public static void megerFiles() throws Exception {
//设置系统用户为hadoop
System.setProperty("HADOOP_USER_NAME", "hadoop") ;
//读取hadoop文件的配置信息
Configuration conf = new Configuration() ;
//创建URI
URI uri = new URI(HDFS_SERVER) ;
//创建两个文件系统的fs
fs = FileSystem.get(uri, conf) ; //针对HDFS
local = FileSystem.get(conf) ; //针对本地文件系统
/* 获取指定路径下的所有文件
* 过滤该路径下的所有svn文件
* ^匹配一行的开头 ;.表示匹配任意一个字符
* *表示匹配0个或多个前面这个字符 ;$匹配一行的结束
* */
FileStatus[] globStatus = local.globStatus(new Path("D:/pdata/*"),
new RegexUncludeFilter("^.*svn$"));
//调试输出
for (FileStatus fileStatus : globStatus) {
System.out.println(fileStatus.getPath().toString());
}
//将一组FileStatus对象转换成Path对象
Path[] dirs = FileUtil.stat2Paths(globStatus);
//获取输入输出流
FSDataOutputStream out = null ;
FSDataInputStream in = null ;
for (Path dir : dirs) { //具体的每个目录下面的所有文件
//文件名称
String fileName = dir.getName().replaceAll("-", "") ;
//只接受该目录下的txt文件
FileStatus[] txtPaths = local.globStatus(new Path(dir + "/*") ,
new RegexAcceptFilter("^.*txt$"));
Path[] txtFiles = FileUtil.stat2Paths(txtPaths);
//设置输出路径
Path hdfsFile = new Path(HDFS_SERVER + "/vip/" + fileName + ".txt") ;
//打开输入输出流,进行读写
out = fs.create(hdfsFile) ; //输出流
for (Path p : txtFiles) {
in = local.open(p) ;
IOUtils.copyBytes(in, out, 4096, false);
//关闭输入流
in.close();
}
if(null != out){
out.close();
}
}
}
//程序入口
public static void main(String[] args) throws Exception {
megerFiles() ;
System.out.println("=====小文件合并成功=====");
}
}