阻塞隊列

       前面幾篇文章看到了造成Java併發程序設計基礎的底層構建塊。然而,實際編程中,應該儘量遠離底層結構。使用由併發處理的專業人士實現的較高層次的結構要方便的多,並且也安全的多。 java

    對於多線程問題,能夠經過使用一個或多個隊列以優雅且安全的方式將其形式化。生產者線程向隊列插入元素,消費者線程則取出它們。使用隊列,能夠安全地從一個線程向另外一個線程傳遞數據。例如,考慮銀行轉賬程序,轉帳線程將轉帳指令對象插入一個隊列中,而不是直接訪問銀行對象。另外一個線程從隊列中取出指令執行轉帳。只有該線程能夠訪問該銀行對象的內部,所以不須要同步(鎖和條件是線程安全的隊列類的實現者須要考慮的)。 編程

        當試圖向隊列添加元素而隊列已滿,或是想從隊列移出元素而隊列爲空的時候,阻塞隊列(blocking queue)致使線程阻塞。在協調多個線程之間的合做時,阻塞隊列是一個有用的工具。工做者線程能夠週期性地將中間結果存儲在阻塞隊列中。其它的工做者線程移出中間結果並進一步加以修改。隊列會自動地平衡負載。若是第一個線程集運行的比第二個慢,第二個線程集在等待結果時會阻塞。若是第一個線程集運行的快,他將等待第二個隊列集遇上來。 安全

    阻塞隊列的方法: 多線程

 方法 併發

正常動做 ide

特殊狀況下的動做 工具

add 性能

添加一個元素 this

若是隊列滿,則拋出IllegalStateException異常 spa

element

返回隊列的頭元素

若是隊列空,拋出NoSuchElementException異常

offer

添加一個元素並返回true

若是隊列滿,返回false

peek

返回隊列的頭元素

若是隊列空,則返回null

poll

移出並返回隊列的頭元素

若是隊列空,則返回null

put

添加一個元素

若是隊列滿,則阻塞

remove

移出並返回頭元素

若是隊列空,則拋出NoSuchElementException異常

take

移出並返回頭元素

若是隊列空,則阻塞

        如上所述,阻塞隊列方法分爲3類,這取決於當隊列滿或空時它們的響應方式。若是將隊列看成線程管理工具來使用,將要用到put和take方法。當試圖向滿的隊列中添加或從空的隊列中移出元素時,add、remove和element操做拋出異常。固然,在一個多線程程序中,隊列會在任什麼時候刻滿或空,所以,必定要使用offer、poll和peek方法做爲替代。這些方法若是不能完成任務,只是給出一個錯誤提示而不會拋出異常。須要注意的一點是,poll和peek方法返回null來指示失敗,所以,向這些隊列中插入null值是非法的。

        offer和poll方法還有帶有超時的方法變體。例如,下面的調用:


    boolean success = q.offer(x,100,TimeUtil.MILLISECONDS);


嘗試在100毫秒的時間內在隊列的尾部插入一個元素。若是成功返回true,不然,達到超時時,返回false。相似地,下面的調用:


Object head = q.poll(100,TimeUnit.MILLISECONDS);


嘗試用100毫秒的時間移出隊列的頭元素,若是成功返回頭元素,不然,達到超時時,返回null。

        java.util.concurrent包提供了阻塞隊列的幾個變種。默認狀況下,LinkedBlockingQueue的容量是沒有上邊界的,可是,也能夠選擇指定最大容量。LinkedBlockingDeque是一個雙端的版本。ArrayBlockingQueue在構造時須要指定容量,而且有一個可選的參數來指定是否須要公平性。若設置了公平參數,則那麼等待了最長時間的線程會優先獲得處理。一般,公平性會下降性能,只有在確實很是須要時才使用它。

        PriorityBlockingQueue是一個帶優先級的隊列,而不是先進先出隊列。元素按照它們的優先級順序被移出。該隊列是沒有容量上限的,可是,若是隊列是空的,取元素的操做會阻塞。

        最後,DelayQueue包含實現Delayed接口的對象:


interface Delayed extends Comparable<Delayed>{
    long getDelay(TimeUnit unit);
}


        getDelay方法返回對象的殘留延遲。負值表示延遲已經結束。元素只有在延遲用完的狀況下才能從DelayQueue移除。還必須實現compareTo方法,DelayQueue使用該方法對元素進行排序。

    下面是一個使用阻塞隊列來控制線程集的例子,在一個目錄及它的全部子目錄下搜索全部文件,打印出包含指定關鍵字的行。


public class BlockingQueueTest {
    /**
     * @param args
     */
    public static void main(String[] args) {
	// TODO Auto-generated method stub
	Scanner in = new Scanner(System.in);
	System.out.print("Enter base directory(e.g. /usr/local/jdk1.6.0/src):");
	String directory = in.nextLine();
	System.out.print("Enter keyword (e.g. volatile):");
	String keyword = in.nextLine();
		
	final int FILE_QUEUE_SIZE = 10;
	final int SEARCH_THREADS = 100;
	BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
		
	FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
	new Thread(enumerator).start();
	for (int i = 1; i <= SEARCH_THREADS; i++) {
            new Thread(new SearchTask(queue, keyword)).start();
	}
    }
}


public class FileEnumerationTask implements Runnable {
    private BlockingQueue<File> queue;
    private File startingDirectory;
    public static File DUMMY = new File("");
	
    public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory){
	this.queue = queue;
	this.startingDirectory = startingDirectory;
    }
	
    @Override
    public void run() {
        try {
	    enumerate(startingDirectory);
	    queue.put(DUMMY);
	} catch (InterruptedException e) {
	    e.printStackTrace();
	}
    }
	
    public void enumerate(File directory) throws InterruptedException{
        File[] files = directory.listFiles();
        for(File file : files){
            if (file.isDirectory()) {
                enumerate(file);
            }else {
                queue.put(file);
            }
        }
    }
}
public class SearchTask implements Runnable {
    private BlockingQueue<File> queue;
    private String keyword;
	
    public SearchTask(BlockingQueue<File> queue,String keyword){
	this.queue = queue;
	this.keyword = keyword;
    }

    @Override
    public void run() {
        try {
	    boolean done = false;
	    while (!done) {
	        File file = queue.take();
		if (file == FileEnumerationTask.DUMMY) {
		    queue.put(file);
		    done = true;
		}else {
		    search(file);
		}
            }
	} catch (InterruptedException e) {
	    // TODO Auto-generated catch block
	    e.printStackTrace();
	} catch (FileNotFoundException e) {
	    // TODO Auto-generated catch block
	    e.printStackTrace();
	}
    }

    private void search(File file) throws FileNotFoundException {
        // TODO Auto-generated method stub
	Scanner in = new Scanner(new FileInputStream(file));
	int lineNumber = 0;
	while (in.hasNextLine()) {
	    lineNumber++;
	    String line = in.nextLine();
	    if(line.contains(keyword))
	        System.out.printf("%s:%d:%s%n",file.getPath(),lineNumber,line);
        }
	in.close();
    }
}
相關文章
相關標籤/搜索