前面幾篇文章看到了造成Java併發程序設計基礎的底層構建塊。然而,實際編程中,應該儘量遠離底層結構。使用由併發處理的專業人士實現的較高層次的結構要方便的多,並且也安全的多。 java
對於多線程問題,能夠經過使用一個或多個隊列以優雅且安全的方式將其形式化。生產者線程向隊列插入元素,消費者線程則取出它們。使用隊列,能夠安全地從一個線程向另外一個線程傳遞數據。例如,考慮銀行轉賬程序,轉帳線程將轉帳指令對象插入一個隊列中,而不是直接訪問銀行對象。另外一個線程從隊列中取出指令執行轉帳。只有該線程能夠訪問該銀行對象的內部,所以不須要同步(鎖和條件是線程安全的隊列類的實現者須要考慮的)。 編程
當試圖向隊列添加元素而隊列已滿,或是想從隊列移出元素而隊列爲空的時候,阻塞隊列(blocking queue)致使線程阻塞。在協調多個線程之間的合做時,阻塞隊列是一個有用的工具。工做者線程能夠週期性地將中間結果存儲在阻塞隊列中。其它的工做者線程移出中間結果並進一步加以修改。隊列會自動地平衡負載。若是第一個線程集運行的比第二個慢,第二個線程集在等待結果時會阻塞。若是第一個線程集運行的快,他將等待第二個隊列集遇上來。 安全
阻塞隊列的方法: 多線程
方法 併發 |
正常動做 ide |
特殊狀況下的動做 工具 |
add 性能 |
添加一個元素 this |
若是隊列滿,則拋出IllegalStateException異常 spa |
element |
返回隊列的頭元素 |
若是隊列空,拋出NoSuchElementException異常 |
offer |
添加一個元素並返回true |
若是隊列滿,返回false |
peek |
返回隊列的頭元素 |
若是隊列空,則返回null |
poll |
移出並返回隊列的頭元素 |
若是隊列空,則返回null |
put |
添加一個元素 |
若是隊列滿,則阻塞 |
remove |
移出並返回頭元素 |
若是隊列空,則拋出NoSuchElementException異常 |
take |
移出並返回頭元素 |
若是隊列空,則阻塞 |
如上所述,阻塞隊列方法分爲3類,這取決於當隊列滿或空時它們的響應方式。若是將隊列看成線程管理工具來使用,將要用到put和take方法。當試圖向滿的隊列中添加或從空的隊列中移出元素時,add、remove和element操做拋出異常。固然,在一個多線程程序中,隊列會在任什麼時候刻滿或空,所以,必定要使用offer、poll和peek方法做爲替代。這些方法若是不能完成任務,只是給出一個錯誤提示而不會拋出異常。須要注意的一點是,poll和peek方法返回null來指示失敗,所以,向這些隊列中插入null值是非法的。
offer和poll方法還有帶有超時的方法變體。例如,下面的調用:
boolean success = q.offer(x,100,TimeUtil.MILLISECONDS);
嘗試在100毫秒的時間內在隊列的尾部插入一個元素。若是成功返回true,不然,達到超時時,返回false。相似地,下面的調用:
Object head = q.poll(100,TimeUnit.MILLISECONDS);
嘗試用100毫秒的時間移出隊列的頭元素,若是成功返回頭元素,不然,達到超時時,返回null。
java.util.concurrent包提供了阻塞隊列的幾個變種。默認狀況下,LinkedBlockingQueue的容量是沒有上邊界的,可是,也能夠選擇指定最大容量。LinkedBlockingDeque是一個雙端的版本。ArrayBlockingQueue在構造時須要指定容量,而且有一個可選的參數來指定是否須要公平性。若設置了公平參數,則那麼等待了最長時間的線程會優先獲得處理。一般,公平性會下降性能,只有在確實很是須要時才使用它。
PriorityBlockingQueue是一個帶優先級的隊列,而不是先進先出隊列。元素按照它們的優先級順序被移出。該隊列是沒有容量上限的,可是,若是隊列是空的,取元素的操做會阻塞。
最後,DelayQueue包含實現Delayed接口的對象:
interface Delayed extends Comparable<Delayed>{ long getDelay(TimeUnit unit); }
getDelay方法返回對象的殘留延遲。負值表示延遲已經結束。元素只有在延遲用完的狀況下才能從DelayQueue移除。還必須實現compareTo方法,DelayQueue使用該方法對元素進行排序。
下面是一個使用阻塞隊列來控制線程集的例子,在一個目錄及它的全部子目錄下搜索全部文件,打印出包含指定關鍵字的行。
public class BlockingQueueTest { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub Scanner in = new Scanner(System.in); System.out.print("Enter base directory(e.g. /usr/local/jdk1.6.0/src):"); String directory = in.nextLine(); System.out.print("Enter keyword (e.g. volatile):"); String keyword = in.nextLine(); final int FILE_QUEUE_SIZE = 10; final int SEARCH_THREADS = 100; BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE); FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory)); new Thread(enumerator).start(); for (int i = 1; i <= SEARCH_THREADS; i++) { new Thread(new SearchTask(queue, keyword)).start(); } } }
public class FileEnumerationTask implements Runnable { private BlockingQueue<File> queue; private File startingDirectory; public static File DUMMY = new File(""); public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory){ this.queue = queue; this.startingDirectory = startingDirectory; } @Override public void run() { try { enumerate(startingDirectory); queue.put(DUMMY); } catch (InterruptedException e) { e.printStackTrace(); } } public void enumerate(File directory) throws InterruptedException{ File[] files = directory.listFiles(); for(File file : files){ if (file.isDirectory()) { enumerate(file); }else { queue.put(file); } } } }
public class SearchTask implements Runnable { private BlockingQueue<File> queue; private String keyword; public SearchTask(BlockingQueue<File> queue,String keyword){ this.queue = queue; this.keyword = keyword; } @Override public void run() { try { boolean done = false; while (!done) { File file = queue.take(); if (file == FileEnumerationTask.DUMMY) { queue.put(file); done = true; }else { search(file); } } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void search(File file) throws FileNotFoundException { // TODO Auto-generated method stub Scanner in = new Scanner(new FileInputStream(file)); int lineNumber = 0; while (in.hasNextLine()) { lineNumber++; String line = in.nextLine(); if(line.contains(keyword)) System.out.printf("%s:%d:%s%n",file.getPath(),lineNumber,line); } in.close(); } }