java多線程合併文件-生產者模式-多線程讀單線程寫

上次那個工做線程又讀又寫的感受在寫上面競爭太激烈了,因而試了試一個線程來寫。java

阻塞隊列維護讀取緩存,給定長度,雖然原本就假設忽略了內存問題,但至少要我能跑起來嘛。緩存

對此作了個簡單的不科學的不嚴謹的不靠譜的比較:安全

文件數(112k/個) 程序一:同讀寫(毫秒) 程序二:多讀一寫 程序一:每一個文件用時 程序二:每一個文件用時
10 267 179 26.7 17.9
50 684 387 13.68 7.74
100 1285 437 12.85 4.37
200 1829 1236 9.145 6.18
500 7162 1762 14.324 3.524
1000 12992 2284 12.992 2.284

 

一言不合貼代碼:app

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.FileHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;

public class TxtMergePC extends Thread {

	/**
	 * 日誌
	 */
	private static Logger logger;

	/**
	 * 日誌路徑
	 */
	private static String logPath = "D:\\text\\log\\logPC.log";

	/**
	 * 日誌配置
	 */
	static {
		logger = Logger.getLogger("txtMerge");
		FileHandler fh;
		try {
			fh = new FileHandler(logPath, true);
			logger.addHandler(fh);// 日誌輸出文件
			logger.setLevel(Level.ALL);
			fh.setFormatter(new SimpleFormatter());// 輸出格式
		} catch (SecurityException e) {
			logger.log(Level.SEVERE, "安全性錯誤", e);
		} catch (IOException e) {
			logger.log(Level.SEVERE, "讀取文件日誌錯誤", e);
		}
	}

	private static void infoLog(String msg) {
		logger.log(Level.INFO, msg);
	}

	private static void errorLog(String msg, Throwable e) {
		logger.log(Level.SEVERE, msg, e);
	}

	/**
	 * 待合併文件夾路徑
	 */
	private String path;

	/**
	 * 輸出路徑(絕對路徑)
	 */
	private String outPath;

	/**
	 * 文件列表
	 */
	private File[] files;

	/**
	 * 剩餘未合併文件數
	 */
	private Integer fileNum;// 未處理的文件數

	private Integer dealFileNum = 0;// 處理文件數

	/**
	 * 工做線程總數
	 */
	private Integer threadNum;// 讀線程數

	private Integer queueSize;// 隊列容量

	private ArrayBlockingQueue<StringBuilder> queue;

	public TxtMergePC(String path, String outPath, int threadNum, int queueSize) {
		this.path = path;
		this.outPath = outPath;
		this.threadNum = threadNum;
		this.queueSize = queueSize;
		this.queue = new ArrayBlockingQueue<StringBuilder>(this.queueSize);
		this.files = new File(this.path).listFiles(new FilenameFilter() {

			@Override
			public boolean accept(File dir, String name) {
				if (name.endsWith(".txt"))
					return true;
				return false;
			}
		});
		this.fileNum = this.files.length;
		StringBuilder sBuilder = new StringBuilder(this.fileNum + "個文本文件, at:" + this.path);
		for (File f : this.files) {
			sBuilder.append("\n");
			sBuilder.append(f.getName());
		}
		infoLog(sBuilder.toString());
	}

	@Override
	public void run() {
		if (this.fileNum <= 0) {
			infoLog("無文件:" + this.path);
			return;
		}
		CountDownLatch doneSignal = new CountDownLatch(this.threadNum + 1);// 結束信號,用於關閉寫操做
		CountDownLatch startSignal = new CountDownLatch(1);// 開始執行信號
		ExecutorService threadPool = Executors.newCachedThreadPool();
		final FileWriter fileWriter;
		try {
			fileWriter = new FileWriter(outPath);
		} catch (IOException e) {
			errorLog("建立輸出文件失敗", e);
			return;
		}
		for (int i = 0; i < threadNum; i++) {
			threadPool.execute(new Worker(startSignal, doneSignal));
		}
		startSignal.countDown();// 開始
		long begin = new Date().getTime();
		threadPool.execute(new Thread(new Runnable() {

			@Override
			public void run() {
				infoLog("寫線程 啓動");
				int i = 1;
				while (dealFileNum < files.length) {
					StringBuilder sb = null;
					try {
						sb = queue.take();
						infoLog("寫線程 - " + (i++));
					} catch (InterruptedException e) {
						errorLog("取隊列緩存失敗", e);
					}
					if (sb == null)
						continue;
					try {
						fileWriter.write(sb.toString());
					} catch (IOException e) {
						errorLog("寫失敗", e);
					}
				}
				doneSignal.countDown();
				infoLog("寫線程 結束");
			}
		}));
		try {
			doneSignal.await();// 等待其餘線程結束
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			try {
				fileWriter.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		threadPool.shutdown();
		long end = new Date().getTime();
		infoLog("結束,用時:" + (end - begin) + " ms");
	}

	/**
	 * 獲取一個未處理的文件
	 * 
	 * @author LiuJie
	 * @time 2016年8月24日 下午10:43:15
	 * @return 處理完了返回null
	 */
	private File getFile() {
		if (0 == fileNum) {
			return null;
		}
		File file = null;
		synchronized (this.fileNum) {
			if (0 < fileNum) {
				file = this.files[this.fileNum - 1];
				this.fileNum--;
				infoLog(Thread.currentThread().getName() + " 操做" + file.getName() + ",剩餘:" + fileNum);
			}
		}
		return file;
	}

	/**
	 * 工做線程,容器中拿不出文件時結束
	 * 
	 * @author 劉傑
	 * @time 2016年8月24日 下午11:04:36
	 */
	private class Worker implements Runnable {

		private CountDownLatch startSignal;
		private CountDownLatch doneSignal;

		public Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
			this.startSignal = startSignal;
			this.doneSignal = doneSignal;
		}

		@Override
		public void run() {
			infoLog(Thread.currentThread().getName() + " 就緒");
			try {
				startSignal.await();// 等待開始執行信號的發佈
			} catch (InterruptedException e1) {
				errorLog("等待開始信號異常", e1);
			}
			infoLog(Thread.currentThread().getName() + " 開始");
			FileInputStream fis = null;
			InputStreamReader isr = null;
			BufferedReader br = null;
			while (true) {
				File file = getFile();
				if (null == file) {
					break;
				}
				try {
					fis = new FileInputStream(file);
					isr = new InputStreamReader(fis, "UTF-8");
					br = new BufferedReader(isr);
					String readStr = "";
					StringBuilder sb = new StringBuilder("\r\n" + file.getName() + "\r\n");
					while ((readStr = br.readLine()) != null) {
						sb.append(readStr);
					}
					try {
						queue.put(sb);
					} catch (InterruptedException e) {
						errorLog("加入緩存隊列失敗-" + file.getName(), e);
					}
				} catch (FileNotFoundException e) {
					errorLog("找不到指定文件", e);
				} catch (IOException e) {
					errorLog("讀取文件失敗", e);
				} finally {
					try {
						br.close();
						isr.close();
						fis.close();
					} catch (IOException e) {
						errorLog("關閉異常", e);
					}
					dealFileNum++;
				}
			}
			doneSignal.countDown();// 告訴「主線程」工做完成
			infoLog(Thread.currentThread().getName() + " 結束");
		}
	}

	public static void main(String[] args) {
		TxtMergePC txtMerge = new TxtMergePC("D:\\text", "D:\\text\\merge\\mergePC.txt", 5, 5);
		txtMerge.start();
	}

}
相關文章
相關標籤/搜索