阻塞隊列是一個支持兩個附加操做的隊列。這兩個附加的操做支持阻塞的插入和移除方法:java
阻塞隊列經常使用於生產者消費者的場景。其中生產者是向隊列添加元素的線程,消費者是從隊列取出元素的線程,阻塞隊列是存放和獲取元素的容器。git
阻塞隊列的4種處理方式:github
add(e)
當隊列滿,再插入元素,拋出異常remove()
當隊列空,再刪除元素,拋出異常element()
獲取元素offer(e)
插入元素時,插入成功返回truepoll()
移除元素,成功返回該值,不然返回nullpeek()
put(e)
當阻塞隊列滿時,再插入時會阻塞生產者線程,直到隊列可用或中斷退出take()
當隊列空,再移除元素會阻塞消費者線程,直到隊列不空offer(e, time, unit)
隊列滿時再插入元素,阻塞,超時退出poll(time, unit)
隊列空時移除元素,阻塞,超時退出ArrayBlockingQueue 是一個用數組實現的有界的,按照 FIFO 原則對元素排序的阻塞隊列。它還支持對等待的生產者和消費者線程進行排序時的可選公平策略,默認狀況下不保證線程公平的訪問,在構造時能夠選擇公平策略。公平性會下降吞吐量,可是減小了可變性和避免了「不平衡性」。算法
這是一個用鏈表實現的有界阻塞隊列,默認長度和最大長度都是 Integer.MAX_VALUE 。該隊列也是按照 FIFO 原則對元素排序,肯定線程執行的前後順序。編程
這是一個支持優先級的無界祖蘇隊列,默認狀況下采起天然順序升序排序,也能夠經過構造函數指定 Comparator 來對元素進行排序。可是它不能保證相同優先級元素的順序。數組
底層是採用二叉最大堆來實現優先級排序的。緩存
這是一個支持延時獲取元素的無界阻塞隊列,其隊列使用優先隊列 PriorityQueue 實現。隊列中的元素必須實現 Delayed 接口,建立元素時能夠指定多久以後才能從隊列中獲取該元素,只有在元素到期時才能獲取。併發
主要用於緩存,如清除緩衝中超時的數據。還用於定時任務的調度。框架
元素建立時,要實現 Delayed 接口,首先進行初始化;而後實現 getDelay(Timeunit unit)
方法,返回的值是當前元素還須要延時多長時間;最後實現compareTo(Delayed other)
方法,用來指定元素的順序。ide
當消費者從隊列中獲取元素時,若是元素尚未到延時時間,就阻塞當前線程。此外,設置了 leader 變量表示等待獲取隊列頭部元素的線程。若是 leader 不爲空,表示有現成等待獲取隊列頭部元素,使用 await() 方法讓當前線程等待信號。若是 leader 爲空,則把當前線程設置爲 leader,使用 awaitNanos() 方法讓當前線程等待接收信號或等待 delay 時間。
與其餘阻塞隊列不一樣,這是一個不存儲元素的阻塞隊列,每個 put 操做必需要等待一個take操做,不然不能繼續添加元素,反之亦然。分爲公平和不公平訪問隊列,默認狀況採用非公平性策略訪問隊列。
該種隊列自己不存儲任何元素,適合傳遞性場景,把生產者線程處理的數據直接傳遞給消費者線程,其吞吐量高於 LinkedBlockingQueue 和 ArrayBlockingQueue。
這是一個由鏈表結構組成的 FIFO 的無界阻塞 TransferQueue 隊列。它採起一種預佔模式,也就是有就直接拿走,沒有就佔着這個位置直到拿到、超時或中斷。相對於其餘阻塞隊列,多了 tryTransfer 方法和 transfer 方法。
transfer(e,[timeout,unit])
方法: 若是當前有消費者正等待接收元素,該方法能夠把生產者傳入的元素馬上傳輸給消費者。若是沒有消費者等待,該方法將元素存放在隊列的 tail 節點,等到該元素被消費者消費了才返回。tryTransfer(e,[timeout,unit])
方法: 試探生產者傳入的元素是否能直接傳給消費者。若是沒有消費者等待接收元素,返回false。該方法不管消費者是否接收都當即返回,而 transfer 方法必須等消費了才返回。是一個由鏈表組成的雙向阻塞隊列。能夠從隊列兩端插入和移除元素。
該框架主要應用在並行計算中,把一個大人物分割成若干個小任務,最終彙總每一個小任務結果後獲得大結果的框架。Fork 就是把一個大任務切分紅若干子任務並行的執行,Join 就是合併這些子任務的執行結果,最終獲得這個大任務的結果。
工做竊取是指某個線程從其餘隊列裏竊取任務來執行。一般使用雙端隊列,被竊取任務線程永遠從雙端隊列頭部拿任務執行,竊取任務的線程永遠從雙端隊列尾部拿任務執行。
優勢是充分利用線程進行並行計算,減小了線程間的競爭。缺點是在某些狀況下存在競爭,好比隊列只有一個任務時,會消耗更多的資源。
首先,分割任務,將一個大任務分割成子任務,不停分割直到分割出的子任務足夠小。
而後,執行任務併合並結果。分割的子任務分別放在雙端隊列,而後幾個啓動線程分別從雙端隊列獲取任務執行。執行結果放在一個隊列裏,啓動一個線程從隊列拿數據,而後合併這些線程。
public class ForkJoinCase extends RecursiveTask<Integer> {
private final int threshold=5;
private int first;
private int last;
public ForkJoinCase(int first,int last){
this.first=first;
this.last=last;
}
@Override
protected Integer compute() {
int ret=0;
if(last-first<=threshold){//任務足夠小,執行
for(int i=first;i<=last;i++){
ret+=i;
}
}else{//分解任務
int mid=first+(last-first)/2;
ForkJoinCase leftTask=new ForkJoinCase(first,mid);
ForkJoinCase rightTask=new ForkJoinCase(mid+1,last);
//執行子任務
leftTask.fork();
rightTask.fork();
//合併子任務結果
ret=leftTask.join()+rightTask.join();
}
return ret;
}
}
複製代碼
參考資料