hadoop-小文件合并

package com.andy.merge;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

public class RegexAcceptFilter implements PathFilter{
	private final String regex ;

	public RegexAcceptFilter(String regex){
		this.regex = regex ;
	}

	//只接受符合regex的文件
	@Override
	public boolean accept(Path path) {
		boolean flag = path.toString().matches(regex) ;
		return flag;
	}

}
package com.andy.merge;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

//PathFilter是一个接口,里面只有一个方法accept(Path path)
public class RegexUncludeFilter implements PathFilter{
	private final String regex ;

	public RegexUncludeFilter(String regex){
		this.regex = regex ;
	}

	//过滤 regex 格式的文件
	@Override
	public boolean accept(Path path) {
		boolean flag = path.toString().matches(regex);
		//符合得我就接受,不符合的就过滤,所以是非flag
		return !flag;
	}


}
package com.andy.merge;

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 * 小文件合并
 * @author huang
 *
 */
public class MegerSmallFiles {
	//写入到HDFS的FileSystem对象
	private static FileSystem fs = null ;

	//本地文件系统的FileSystem
	private static FileSystem local = null ;

	//HDFS服务路径
	private static final String HDFS_SERVER = "hdfs://192.168.153.111:9000" ;

	//合并小文件的主要方法
	public static void megerFiles() throws Exception {
		//设置系统用户为hadoop
		System.setProperty("HADOOP_USER_NAME", "hadoop") ;

		//读取hadoop文件的配置信息
		Configuration conf = new Configuration() ;

		//创建URI
		URI uri = new URI(HDFS_SERVER) ;

		//创建两个文件系统的fs
		fs = FileSystem.get(uri, conf) ;	//针对HDFS
		local = FileSystem.get(conf) ;		//针对本地文件系统

		/* 获取指定路径下的所有文件
		 * 过滤该路径下的所有svn文件
		 *  ^匹配一行的开头 ;.表示匹配任意一个字符
		 *  *表示匹配0个或多个前面这个字符 ;$匹配一行的结束
		 * */
		FileStatus[] globStatus = local.globStatus(new Path("D:/pdata/*"),
				new RegexUncludeFilter("^.*svn$"));
		//调试输出
		for (FileStatus fileStatus : globStatus) {
			System.out.println(fileStatus.getPath().toString());
		}

		//将一组FileStatus对象转换成Path对象
		Path[] dirs = FileUtil.stat2Paths(globStatus);

		//获取输入输出流
		FSDataOutputStream out = null ;
		FSDataInputStream in = null ;

		for (Path dir : dirs) {	//具体的每个目录下面的所有文件
			//文件名称
			String fileName = dir.getName().replaceAll("-", "") ;
			//只接受该目录下的txt文件
			FileStatus[] txtPaths = local.globStatus(new Path(dir + "/*") ,
					new RegexAcceptFilter("^.*txt$"));
			Path[] txtFiles = FileUtil.stat2Paths(txtPaths);

			//设置输出路径
			Path hdfsFile = new Path(HDFS_SERVER + "/vip/" + fileName + ".txt") ;

			//打开输入输出流,进行读写
			out = fs.create(hdfsFile) ;	//输出流
			for (Path p : txtFiles) {
				in = local.open(p) ;
				IOUtils.copyBytes(in, out, 4096, false);
				//关闭输入流
				in.close();
			}
			if(null != out){
				out.close();
			}
		}
	}

	//程序入口
	public static void main(String[] args) throws Exception {
		megerFiles() ;
		System.out.println("=====小文件合并成功=====");
	}

}
05-06 21:52