上次那個工做線程又讀又寫的感受在寫上面競爭太激烈了,因而試了試一個線程來寫。java
阻塞隊列維護讀取緩存,給定長度,雖然原本就假設忽略了內存問題,但至少要我能跑起來嘛。緩存
對此作了個簡單的不科學的不嚴謹的不靠譜的比較:安全
文件數(112k/個) | 程序一:同讀寫(毫秒) | 程序二:多讀一寫 | 程序一:每一個文件用時 | 程序二:每一個文件用時 |
10 | 267 | 179 | 26.7 | 17.9 |
50 | 684 | 387 | 13.68 | 7.74 |
100 | 1285 | 437 | 12.85 | 4.37 |
200 | 1829 | 1236 | 9.145 | 6.18 |
500 | 7162 | 1762 | 14.324 | 3.524 |
1000 | 12992 | 2284 | 12.992 | 2.284 |
一言不合貼代碼:app
import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStreamReader; import java.util.Date; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.FileHandler; import java.util.logging.Level; import java.util.logging.Logger; import java.util.logging.SimpleFormatter; public class TxtMergePC extends Thread { /** * 日誌 */ private static Logger logger; /** * 日誌路徑 */ private static String logPath = "D:\\text\\log\\logPC.log"; /** * 日誌配置 */ static { logger = Logger.getLogger("txtMerge"); FileHandler fh; try { fh = new FileHandler(logPath, true); logger.addHandler(fh);// 日誌輸出文件 logger.setLevel(Level.ALL); fh.setFormatter(new SimpleFormatter());// 輸出格式 } catch (SecurityException e) { logger.log(Level.SEVERE, "安全性錯誤", e); } catch (IOException e) { logger.log(Level.SEVERE, "讀取文件日誌錯誤", e); } } private static void infoLog(String msg) { logger.log(Level.INFO, msg); } private static void errorLog(String msg, Throwable e) { logger.log(Level.SEVERE, msg, e); } /** * 待合併文件夾路徑 */ private String path; /** * 輸出路徑(絕對路徑) */ private String outPath; /** * 文件列表 */ private File[] files; /** * 剩餘未合併文件數 */ private Integer fileNum;// 未處理的文件數 private Integer dealFileNum = 0;// 處理文件數 /** * 工做線程總數 */ private Integer threadNum;// 讀線程數 private Integer queueSize;// 隊列容量 private ArrayBlockingQueue<StringBuilder> queue; public TxtMergePC(String path, String outPath, int threadNum, int queueSize) { this.path = path; this.outPath = outPath; this.threadNum = threadNum; this.queueSize = queueSize; this.queue = new ArrayBlockingQueue<StringBuilder>(this.queueSize); this.files = new File(this.path).listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { if (name.endsWith(".txt")) return true; return false; } }); this.fileNum = this.files.length; StringBuilder sBuilder = new StringBuilder(this.fileNum + "個文本文件, at:" + this.path); for (File f : this.files) { sBuilder.append("\n"); sBuilder.append(f.getName()); } infoLog(sBuilder.toString()); } @Override public void run() { if (this.fileNum <= 0) { infoLog("無文件:" + this.path); return; } CountDownLatch doneSignal = new CountDownLatch(this.threadNum + 1);// 結束信號,用於關閉寫操做 CountDownLatch startSignal = new CountDownLatch(1);// 開始執行信號 ExecutorService threadPool = Executors.newCachedThreadPool(); final FileWriter fileWriter; try { fileWriter = new FileWriter(outPath); } catch (IOException e) { errorLog("建立輸出文件失敗", e); return; } for (int i = 0; i < threadNum; i++) { threadPool.execute(new Worker(startSignal, doneSignal)); } startSignal.countDown();// 開始 long begin = new Date().getTime(); threadPool.execute(new Thread(new Runnable() { @Override public void run() { infoLog("寫線程 啓動"); int i = 1; while (dealFileNum < files.length) { StringBuilder sb = null; try { sb = queue.take(); infoLog("寫線程 - " + (i++)); } catch (InterruptedException e) { errorLog("取隊列緩存失敗", e); } if (sb == null) continue; try { fileWriter.write(sb.toString()); } catch (IOException e) { errorLog("寫失敗", e); } } doneSignal.countDown(); infoLog("寫線程 結束"); } })); try { doneSignal.await();// 等待其餘線程結束 } catch (InterruptedException e) { e.printStackTrace(); } finally { try { fileWriter.close(); } catch (IOException e) { e.printStackTrace(); } } threadPool.shutdown(); long end = new Date().getTime(); infoLog("結束,用時:" + (end - begin) + " ms"); } /** * 獲取一個未處理的文件 * * @author LiuJie * @time 2016年8月24日 下午10:43:15 * @return 處理完了返回null */ private File getFile() { if (0 == fileNum) { return null; } File file = null; synchronized (this.fileNum) { if (0 < fileNum) { file = this.files[this.fileNum - 1]; this.fileNum--; infoLog(Thread.currentThread().getName() + " 操做" + file.getName() + ",剩餘:" + fileNum); } } return file; } /** * 工做線程,容器中拿不出文件時結束 * * @author 劉傑 * @time 2016年8月24日 下午11:04:36 */ private class Worker implements Runnable { private CountDownLatch startSignal; private CountDownLatch doneSignal; public Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } @Override public void run() { infoLog(Thread.currentThread().getName() + " 就緒"); try { startSignal.await();// 等待開始執行信號的發佈 } catch (InterruptedException e1) { errorLog("等待開始信號異常", e1); } infoLog(Thread.currentThread().getName() + " 開始"); FileInputStream fis = null; InputStreamReader isr = null; BufferedReader br = null; while (true) { File file = getFile(); if (null == file) { break; } try { fis = new FileInputStream(file); isr = new InputStreamReader(fis, "UTF-8"); br = new BufferedReader(isr); String readStr = ""; StringBuilder sb = new StringBuilder("\r\n" + file.getName() + "\r\n"); while ((readStr = br.readLine()) != null) { sb.append(readStr); } try { queue.put(sb); } catch (InterruptedException e) { errorLog("加入緩存隊列失敗-" + file.getName(), e); } } catch (FileNotFoundException e) { errorLog("找不到指定文件", e); } catch (IOException e) { errorLog("讀取文件失敗", e); } finally { try { br.close(); isr.close(); fis.close(); } catch (IOException e) { errorLog("關閉異常", e); } dealFileNum++; } } doneSignal.countDown();// 告訴「主線程」工做完成 infoLog(Thread.currentThread().getName() + " 結束"); } } public static void main(String[] args) { TxtMergePC txtMerge = new TxtMergePC("D:\\text", "D:\\text\\merge\\mergePC.txt", 5, 5); txtMerge.start(); } }