点击(此处)折叠或打开

  1. import org.apache.log4j.Logger;

  2. import java.io.FileNotFoundException;
  3. import java.io.IOException;
  4. import java.io.RandomAccessFile;
  5. import java.io.UnsupportedEncodingException;
  6. import java.util.ArrayList;
  7. import java.util.List;

  8. /**
  9.  * object: FileReadLineThread
  10.  * author: 程晓鹏
  11.  * description: 文件读取行线程类
  12.  * date: 2018-04-13 16:51
  13.  */
  14. public class FileReadLineThread extends Thread implements Closeable{
  15.     protected static final Logger log = Logger.getLogger(FileReadLineThread.class);
  16.     private ReadWriteBooleanLock isRun; //是否运行
  17.     private ReadWriteBooleanLock isBreakWhile; //是否跳出while循环
  18.     private IFileReadLineNotify readLineNotify;
  19.     private List<ReadFileInfo> files; //待处理文件集合
  20.     private long closeValidSleepTime; //关闭验证时休眠时间
  21.     private long closeStartTime; //关闭开始时间

  22.     /**
  23.      * 默认构造函数
  24.      */
  25.     public FileReadLineThread(){
  26.         this.setName(this.getClass().getName().toString()); //设置线程名
  27.         this.files = new ArrayList<ReadFileInfo>();
  28.         this.isRun = new ReadWriteBooleanLock(true);
  29.         this.isBreakWhile = new ReadWriteBooleanLock(false);
  30.         this.closeValidSleepTime = 5;
  31.         this.closeStartTime = 0;
  32.     }

  33.     public void setReadLineNotify(IFileReadLineNotify readLineNotify) {
  34.         this.readLineNotify = readLineNotify;
  35.     }

  36.     private boolean isRun(){
  37.         return this.isRun.read().booleanValue();
  38.     }

  39.     private boolean isBreakWhile(){
  40.         return this.isBreakWhile.read().booleanValue();
  41.     }

  42.     @Override
  43.     public void run() {
  44.         exec();
  45.     }

  46.     /**
  47.      * 执行操作
  48.      */
  49.     private void exec(){
  50.         while(this.isRun()){
  51.             this.exec0();
  52.             synchronized (this.isRun){
  53.                 try {
  54.                     if(!this.isRun() && this.files.size()==0){
  55.                         break;
  56.                     }
  57.                     this.isRun.wait();
  58.                 } catch (InterruptedException e) {
  59.                     e.printStackTrace();
  60.                 }
  61.             }
  62.         }

  63.         this.isBreakWhile.write(true);
  64.         this.exec0(); //再次验证,是否还有未处理的数据
  65.         long endTime = System.currentTimeMillis(); //结束时间
  66.         this.finishCloseLog(endTime-this.closeStartTime);
  67.     }

  68.     /**
  69.      * 执行文件读取
  70.      */
  71.     private void exec0(){
  72.         while(this.files.size()>0){
  73.             ReadFileInfo file = this.files.remove(0);
  74.             this.execRead(file);
  75.         }
  76.     }

  77.     /**
  78.      * 执行读取
  79.      * @param file 文件信息
  80.      */
  81.     private void execRead(ReadFileInfo file){
  82.         long startTime = System.currentTimeMillis(); //开始时间
  83.         RandomAccessFile raf = null;
  84.         try {
  85.             raf = new RandomAccessFile(file.getFilePath(),"r");
  86.         } catch (FileNotFoundException e) {
  87.             e.printStackTrace();
  88.         }
  89.         long lLine = 0; //行数
  90.         while(true){
  91.             String strLine = null;
  92.             try {
  93.                 strLine = raf.readLine(); //读取一行
  94.                 if(strLine != null){
  95.                     String strLineData = this.formatCharsetData(strLine, file.getCharsetName()); //进行字符集转化
  96.                     this.readLineNotify.fileReadLineNotify(file.getFileKey(), file.getFilePath(), ++lLine, strLineData); //进行数据通知
  97.                 }else{
  98.                     raf.close();
  99.                     long endTime = System.currentTimeMillis(); //结束时间
  100.                     this.readLineNotify.fileReadLineFinishNotify(file.getFileKey(), file.getFilePath(), lLine, endTime-startTime);
  101.                     this.printLog(file.getFileKey(), file.getFilePath(), lLine,endTime-startTime, this.files.size());
  102.                     break;
  103.                 }
  104.             } catch (IOException e) {
  105.                 e.printStackTrace();
  106.             }
  107.         }
  108.     }

  109.     /**
  110.      * 打印日志
  111.      * @param fileKey 文件KEY
  112.      * @param filePath 文件路径
  113.      * @param fileLines 文件行数
  114.      * @param useTime 执行用时
  115.      * @param noReadFiles 剩余没有读取的文件数
  116.      */
  117.     public void printLog(String fileKey, String filePath, long fileLines, long useTime, int noReadFiles){
  118.         if(noReadFiles>0){
  119.             log.info("FileKey: "+fileKey +" 文件路径: " + filePath +" 文件行数:" + String.valueOf(fileLines) + " 读取用时:"+ String.valueOf(useTime)+"ms. [未处理:" +String.valueOf(noReadFiles)+"个]");
  120.         }else{
  121.             log.info("FileKey: "+fileKey +" 文件路径: " + filePath +" 文件行数:" + String.valueOf(fileLines) + " 读取用时:"+ String.valueOf(useTime)+"ms. [处理完成]");
  122.         }
  123.     }

  124.     /**
  125.      * 格式化字符集数据
  126.      * @param data 数据
  127.      * @param charset 字符集
  128.      * @return 格式化后的字符串数据
  129.      */
  130.     private String formatCharsetData(String data, String charset){
  131.         String result = null;
  132.         byte[] byteData = null;
  133.         try {
  134.             byteData = data.getBytes("ISO-8859-1");
  135.             if(charset !=null && charset.length()>0){
  136.                 result = new String(byteData,charset);
  137.             }
  138.         } catch (UnsupportedEncodingException e) {
  139.             e.printStackTrace();
  140.             result = data;
  141.         }

  142.         return result;
  143.     }

  144.     /**
  145.      * 读取文件
  146.      * @param filePath 文件路径
  147.      * @param charsetName 字符集名称
  148.      */
  149.     public void readFile(String filePath, String charsetName){
  150.         this.readFile("SystemDefault", filePath, charsetName);
  151.     }

  152.     /**
  153.      * 读取文件
  154.      * @param fileKey 文件KEY
  155.      * @param filePath 文件路径
  156.      * @param charsetName 文件的字符集
  157.      */
  158.     public void readFile(String fileKey, String filePath, String charsetName){
  159.         if(this.isRun()) {
  160.             ReadFileInfo file = new ReadFileInfo();
  161.             file.setCharsetName(charsetName);
  162.             file.setFileKey(fileKey);
  163.             file.setFilePath(filePath);
  164.             this.files.add(file);

  165.             synchronized (this.isRun) {
  166.                 this.isRun.notifyAll();
  167.             }
  168.         }
  169.     }

  170.     public void setCloseValidSleepTime(long closeValidSleepTime) {
  171.         this.closeValidSleepTime = closeValidSleepTime;
  172.     }

  173.     public void beginCloseLog(){
  174.         log.info("开始进行,文件读取线程, 关闭操作... ...");
  175.     }

  176.     public void finishCloseLog(long useTime){
  177.         log.info("文件读取线程, 关闭完成. [用时:"+String.valueOf(useTime)+"ms]");
  178.     }

  179.     @Override
  180.     public void close() {
  181.         this.closeStartTime = System.currentTimeMillis(); //关闭操作,开始时间
  182.         this.beginCloseLog();
  183.         synchronized (this.isRun) {
  184.             this.isRun.write(false);
  185.             this.isRun.notifyAll();
  186.         }

  187.         while(!(this.files.size()==0 && this.isBreakWhile())) {
  188.             try {
  189.                 Thread.sleep(this.closeValidSleepTime);
  190.                 if(!this.isBreakWhile()) {
  191.                     synchronized (this.isRun) {
  192.                         this.isRun.notifyAll();
  193.                     }
  194.                 }
  195.             } catch (InterruptedException e) {
  196.                 e.printStackTrace();
  197.             }
  198.         }
  199.     }

  200. }

11-27 19:35