Java-BlockingQueue的使用

每次都是隔很長時間纔在博客中寫點什麼,說本身忙吧,這是給本身的一個藉口,其實呢仍是懶啊。哎。。。html

最近項目中有個對比的需求,須要從日誌文件中獲取到參數,而後調用不一樣的API,進行結果的對比。可是不知用什麼方式比較好,因而查了下jdk的手冊,發現了BlockingQueue這個好東西。java

關於BlockingQueue的介紹,你們有興趣的能夠本身看下:http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.htmlapi

 

需求呢其實很簡單就是將參數放置到Queue中,而後交由下一個策略去消費。剛開始時是經過不一樣的線程往隊列中存放數據,而後返回給下個服務一個BlockingQueue的對象,下一個策略從隊列中消費,code以下:安全

@SuppressWarnings("rawtypes")
	@Override
	public BlockingQueue getTxtLogContent(String path) {
		File file = new File(path);
		BufferedReader reader = null;
		String tempStr = null;
		final BlockingQueue queue = new LinkedBlockingQueue();
		try {
			reader = new BufferedReader(new FileReader(file));
			while ((tempStr = reader.readLine()) != null) {
				final InputOutputPrameters parameter = new InputOutputPrameters();
				String[] list = tempStr.split(";");
				if (list != null && list.length > 0) {
					parameter.setInputParamters(list[0]);
					parameter.setOutputParameters(list[1]);
				}
				new Thread(){
					@SuppressWarnings("unchecked")
					public void run(){
						try {	
							Thread.sleep((long)(Math.random()*100));
							log.info("開始存入數據!");
							queue.put(parameter);
							log.info("已經存入數據,目前隊列中有 " + queue.size() +" 個隊列!輸入參數:"+ parameter.getInputParamters() + ";\n輸出參數:" + parameter.getOutputParameters());
						} catch (Exception e) {
							log.error("系統異常:" + e);
						}
					}
				}.start();
			}
			reader.close();
		} catch (FileNotFoundException  e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}finally {
			 if (reader != null) {
				 try {
					 reader.close();
				 } catch (Exception e) {
					 e.printStackTrace();
				 }
			 }
		 }
		return queue;
	}

  但是在實際運行時,因爲日誌比較大,下一個策略可能要等1hour或更長的時間才能開始處理,這明顯是不符合要求的,因而又優化了下,將BlockingQueue改成全局static的,而後下一個策略能夠直接監控這個隊列中是否有值,有值就消費,沒值就阻塞線程等待或者超時等其餘處理。oracle

改進後的code:dom

一、新建一個隊列類:ide

public class ParameterQueue extends LinkedBlockingQueue<InputOutputPrameters> {

	/** 
	*@Fields serialVersionUID: 
	*/
	private static final long serialVersionUID = 6032356446145302484L;
	
	private static BlockingQueue<InputOutputPrameters> queue = new LinkedBlockingQueue<InputOutputPrameters>();

	/**
	 * @Fields log: 日誌記錄
	 */
	private static final Logger log = LoggerFactory
			.getLogger(ParameterQueue.class);
	
	/** 
	 * 獲取隊列中的對象 
	 * @Method:getParameter
	 * @Description: 獲取隊列中的對象 
	 * @return 獲取到的對象信息
	*/
	public static InputOutputPrameters getParameter(){
		InputOutputPrameters result = null;
		try {
			result  = (InputOutputPrameters)queue.take();
		} catch (Exception e) {
			log.error("獲取隊列異常,異常信息:" + e);
		}
		return result;
	}
	
	/** 
	 * 獲取隊列的數量
	 * @Method:getQueueSize
	 * @Description: 獲取隊列的數量 
	 * @return 數量
	*/
	public static Integer getQueueSize() {
		return queue.size();
	}
	
	/** 
	 * 放置參數到隊列中 
	 * @Method:putParameter
	 * @Description: 放置參數到隊列中 
	 * @param parameter 要放置的對象
	*/
	public static void putParameter(InputOutputPrameters parameter) {
		try {
			queue.put(parameter);
		} catch (Exception e) {
			log.error("插入隊列異常,異常信息:" + e);
		}
	}
}

  二、讀取文件時,直接操做該隊列,往隊列中put值,下一個策略從該隊列中get值,put的code以下:優化

public void getSource(String path) {
		try {
			File file = new File(path);
			BufferedReader reader = null;
			String tempStr = null;
			try {
				reader = new BufferedReader(new FileReader(file));
				while ((tempStr = reader.readLine()) != null) {
					final InputOutputPrameters parameter = new InputOutputPrameters();
					String[] list = tempStr.split(";");
					if (list != null && list.length > 0) {
						parameter.setInputParamters(list[0]);
						parameter.setOutputParameters(list[1]);
					}
					putInQueue(parameter);
				}
				reader.close();
			} catch (FileNotFoundException  e) {
				e.printStackTrace();
			} catch (IOException e) {
				e.printStackTrace();
			}finally {
				 if (reader != null) {
					 try {
						 reader.close();
					 } catch (Exception e) {
						 e.printStackTrace();
					 }
				 }
			 }
		} catch (Exception e) {
			log.error("系統異常: " + e);
		}
	}

	/** 
	 * 將參數存放至隊列中 
	 * @Method:putInQueue
	 * @Description: 將參數存放至隊列中 
	 * @param parameter 要存放的對象
	*/
	private void putInQueue(final InputOutputPrameters parameter) {
		new Thread(){
			public void run(){
				try {	
					Thread.sleep((long)(Math.random()*100));
					log.info("開始存入數據!");
					ParameterQueue.putParameter(parameter);
					log.info("已經存入數據,目前隊列中有 " + ParameterQueue.getQueueSize() +" 個隊列!輸入參數:"+ parameter.getInputParamters() + ";\n輸出參數:" + parameter.getOutputParameters());
				} catch (Exception e) {
					log.error("系統異常:" + e);
				}
			}
		}.start();
	}

  

因而這個要求就達到了。記錄下這個小需求,方便之後查閱。線程

簡要說下,BlockingQueue是線程安全的,經常使用的是ArrayBlockingQueue、LinkedBlockingQueue日誌

ArrayBlockingQueue須要制定容量,而LinkedBlockingQueue不須要

同時在消費時,take()是會阻塞線程的,若是是單線程跑時,take()不到時整個線程就卡了

因此看具體環境需求,是用take仍是其餘的,我通常用poll,由於能夠制定超時時間。

哎 不知道怎麼寫了,就這樣吧。

相關文章
相關標籤/搜索