点击(此处)折叠或打开
- import org.apache.log4j.Logger;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.io.RandomAccessFile;
- import java.io.UnsupportedEncodingException;
- import java.util.ArrayList;
- import java.util.List;
- /**
- * object: FileReadLineThread
- * author: 程晓鹏
- * description: 文件读取行线程类
- * date: 2018-04-13 16:51
- */
- public class FileReadLineThread extends Thread implements Closeable{
- protected static final Logger log = Logger.getLogger(FileReadLineThread.class);
- private ReadWriteBooleanLock isRun; //是否运行
- private ReadWriteBooleanLock isBreakWhile; //是否跳出while循环
- private IFileReadLineNotify readLineNotify;
- private List<ReadFileInfo> files; //待处理文件集合
- private long closeValidSleepTime; //关闭验证时休眠时间
- private long closeStartTime; //关闭开始时间
- /**
- * 默认构造函数
- */
- public FileReadLineThread(){
- this.setName(this.getClass().getName().toString()); //设置线程名
- this.files = new ArrayList<ReadFileInfo>();
- this.isRun = new ReadWriteBooleanLock(true);
- this.isBreakWhile = new ReadWriteBooleanLock(false);
- this.closeValidSleepTime = 5;
- this.closeStartTime = 0;
- }
- public void setReadLineNotify(IFileReadLineNotify readLineNotify) {
- this.readLineNotify = readLineNotify;
- }
- private boolean isRun(){
- return this.isRun.read().booleanValue();
- }
- private boolean isBreakWhile(){
- return this.isBreakWhile.read().booleanValue();
- }
- @Override
- public void run() {
- exec();
- }
- /**
- * 执行操作
- */
- private void exec(){
- while(this.isRun()){
- this.exec0();
- synchronized (this.isRun){
- try {
- if(!this.isRun() && this.files.size()==0){
- break;
- }
- this.isRun.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- this.isBreakWhile.write(true);
- this.exec0(); //再次验证,是否还有未处理的数据
- long endTime = System.currentTimeMillis(); //结束时间
- this.finishCloseLog(endTime-this.closeStartTime);
- }
- /**
- * 执行文件读取
- */
- private void exec0(){
- while(this.files.size()>0){
- ReadFileInfo file = this.files.remove(0);
- this.execRead(file);
- }
- }
- /**
- * 执行读取
- * @param file 文件信息
- */
- private void execRead(ReadFileInfo file){
- long startTime = System.currentTimeMillis(); //开始时间
- RandomAccessFile raf = null;
- try {
- raf = new RandomAccessFile(file.getFilePath(),"r");
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- }
- long lLine = 0; //行数
- while(true){
- String strLine = null;
- try {
- strLine = raf.readLine(); //读取一行
- if(strLine != null){
- String strLineData = this.formatCharsetData(strLine, file.getCharsetName()); //进行字符集转化
- this.readLineNotify.fileReadLineNotify(file.getFileKey(), file.getFilePath(), ++lLine, strLineData); //进行数据通知
- }else{
- raf.close();
- long endTime = System.currentTimeMillis(); //结束时间
- this.readLineNotify.fileReadLineFinishNotify(file.getFileKey(), file.getFilePath(), lLine, endTime-startTime);
- this.printLog(file.getFileKey(), file.getFilePath(), lLine,endTime-startTime, this.files.size());
- break;
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 打印日志
- * @param fileKey 文件KEY
- * @param filePath 文件路径
- * @param fileLines 文件行数
- * @param useTime 执行用时
- * @param noReadFiles 剩余没有读取的文件数
- */
- public void printLog(String fileKey, String filePath, long fileLines, long useTime, int noReadFiles){
- if(noReadFiles>0){
- log.info("FileKey: "+fileKey +" 文件路径: " + filePath +" 文件行数:" + String.valueOf(fileLines) + " 读取用时:"+ String.valueOf(useTime)+"ms. [未处理:" +String.valueOf(noReadFiles)+"个]");
- }else{
- log.info("FileKey: "+fileKey +" 文件路径: " + filePath +" 文件行数:" + String.valueOf(fileLines) + " 读取用时:"+ String.valueOf(useTime)+"ms. [处理完成]");
- }
- }
- /**
- * 格式化字符集数据
- * @param data 数据
- * @param charset 字符集
- * @return 格式化后的字符串数据
- */
- private String formatCharsetData(String data, String charset){
- String result = null;
- byte[] byteData = null;
- try {
- byteData = data.getBytes("ISO-8859-1");
- if(charset !=null && charset.length()>0){
- result = new String(byteData,charset);
- }
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- result = data;
- }
- return result;
- }
- /**
- * 读取文件
- * @param filePath 文件路径
- * @param charsetName 字符集名称
- */
- public void readFile(String filePath, String charsetName){
- this.readFile("SystemDefault", filePath, charsetName);
- }
- /**
- * 读取文件
- * @param fileKey 文件KEY
- * @param filePath 文件路径
- * @param charsetName 文件的字符集
- */
- public void readFile(String fileKey, String filePath, String charsetName){
- if(this.isRun()) {
- ReadFileInfo file = new ReadFileInfo();
- file.setCharsetName(charsetName);
- file.setFileKey(fileKey);
- file.setFilePath(filePath);
- this.files.add(file);
- synchronized (this.isRun) {
- this.isRun.notifyAll();
- }
- }
- }
- public void setCloseValidSleepTime(long closeValidSleepTime) {
- this.closeValidSleepTime = closeValidSleepTime;
- }
- public void beginCloseLog(){
- log.info("开始进行,文件读取线程, 关闭操作... ...");
- }
- public void finishCloseLog(long useTime){
- log.info("文件读取线程, 关闭完成. [用时:"+String.valueOf(useTime)+"ms]");
- }
- @Override
- public void close() {
- this.closeStartTime = System.currentTimeMillis(); //关闭操作,开始时间
- this.beginCloseLog();
- synchronized (this.isRun) {
- this.isRun.write(false);
- this.isRun.notifyAll();
- }
- while(!(this.files.size()==0 && this.isBreakWhile())) {
- try {
- Thread.sleep(this.closeValidSleepTime);
- if(!this.isBreakWhile()) {
- synchronized (this.isRun) {
- this.isRun.notifyAll();
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }