Java併發6:阻塞隊列,Fork/Join框架

阻塞隊列

阻塞隊列是一個支持兩個附加操做的隊列。這兩個附加的操做支持阻塞的插入和移除方法:java

  • 支持阻塞的插入方法:隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿
  • 支持阻塞的移除方法:隊列空時,獲取元素的線程會等待隊列變爲非空

阻塞隊列經常使用於生產者消費者的場景。其中生產者是向隊列添加元素的線程,消費者是從隊列取出元素的線程,阻塞隊列是存放和獲取元素的容器。git

阻塞隊列的4種處理方式:github

  1. 拋出異常:
    • add(e) 當隊列滿,再插入元素,拋出異常
    • remove() 當隊列空,再刪除元素,拋出異常
    • element() 獲取元素
  2. 返回特殊值:
    • offer(e) 插入元素時,插入成功返回true
    • poll() 移除元素,成功返回該值,不然返回null
    • peek()
  3. 一直阻塞:
    • put(e) 當阻塞隊列滿時,再插入時會阻塞生產者線程,直到隊列可用或中斷退出
    • take() 當隊列空,再移除元素會阻塞消費者線程,直到隊列不空
  4. 超時退出
    • offer(e, time, unit) 隊列滿時再插入元素,阻塞,超時退出
    • poll(time, unit) 隊列空時移除元素,阻塞,超時退出

Java中幾種阻塞隊列

  • ArrayBlockingQueue: 數組結構構成的有界 FIFO 阻塞隊列
  • LinkedBolckingQueue: 鏈表結構構成的有界 FIFO 阻塞隊列
  • PriorityBlockingQueue: 支持優先級排序的無界阻塞隊列
  • DelayQueue: 支持延時獲取元素,使用優先級隊列實現的無界阻塞隊列
  • SynchronousQueue: 不存儲元素的阻塞隊列,不爲隊列元素維護存儲空間
  • LinkedTransferQueue: 鏈表結構構成的無界阻塞隊列
  • LinkedBlockingDeque: 鏈表構成的雙向阻塞隊列

ArrayBlockingQueue

ArrayBlockingQueue 是一個用數組實現的有界的,按照 FIFO 原則對元素排序的阻塞隊列。它還支持對等待的生產者和消費者線程進行排序時的可選公平策略,默認狀況下不保證線程公平的訪問,在構造時能夠選擇公平策略。公平性會下降吞吐量,可是減小了可變性和避免了「不平衡性」。算法

LinkedBlockingQueue

這是一個用鏈表實現的有界阻塞隊列,默認長度和最大長度都是 Integer.MAX_VALUE 。該隊列也是按照 FIFO 原則對元素排序,肯定線程執行的前後順序。編程

PriorityBlockingQueue

這是一個支持優先級的無界祖蘇隊列,默認狀況下采起天然順序升序排序,也能夠經過構造函數指定 Comparator 來對元素進行排序。可是它不能保證相同優先級元素的順序。數組

底層是採用二叉最大堆來實現優先級排序的。緩存

DelayQueue

這是一個支持延時獲取元素的無界阻塞隊列,其隊列使用優先隊列 PriorityQueue 實現。隊列中的元素必須實現 Delayed 接口,建立元素時能夠指定多久以後才能從隊列中獲取該元素,只有在元素到期時才能獲取。併發

主要用於緩存,如清除緩衝中超時的數據。還用於定時任務的調度。框架

元素建立時,要實現 Delayed 接口,首先進行初始化;而後實現 getDelay(Timeunit unit)方法,返回的值是當前元素還須要延時多長時間;最後實現compareTo(Delayed other)方法,用來指定元素的順序。ide

當消費者從隊列中獲取元素時,若是元素尚未到延時時間,就阻塞當前線程。此外,設置了 leader 變量表示等待獲取隊列頭部元素的線程。若是 leader 不爲空,表示有現成等待獲取隊列頭部元素,使用 await() 方法讓當前線程等待信號。若是 leader 爲空,則把當前線程設置爲 leader,使用 awaitNanos() 方法讓當前線程等待接收信號或等待 delay 時間。

SynchronousQueue

與其餘阻塞隊列不一樣,這是一個不存儲元素的阻塞隊列,每個 put 操做必需要等待一個take操做,不然不能繼續添加元素,反之亦然。分爲公平和不公平訪問隊列,默認狀況採用非公平性策略訪問隊列。

該種隊列自己不存儲任何元素,適合傳遞性場景,把生產者線程處理的數據直接傳遞給消費者線程,其吞吐量高於 LinkedBlockingQueue 和 ArrayBlockingQueue。

LinkedTransferQueue

這是一個由鏈表結構組成的 FIFO 的無界阻塞 TransferQueue 隊列。它採起一種預佔模式,也就是有就直接拿走,沒有就佔着這個位置直到拿到、超時或中斷。相對於其餘阻塞隊列,多了 tryTransfer 方法和 transfer 方法。

  • transfer(e,[timeout,unit]) 方法: 若是當前有消費者正等待接收元素,該方法能夠把生產者傳入的元素馬上傳輸給消費者。若是沒有消費者等待,該方法將元素存放在隊列的 tail 節點,等到該元素被消費者消費了才返回。
  • tryTransfer(e,[timeout,unit])方法: 試探生產者傳入的元素是否能直接傳給消費者。若是沒有消費者等待接收元素,返回false。該方法不管消費者是否接收都當即返回,而 transfer 方法必須等消費了才返回。

LinkedBlockingDeque

是一個由鏈表組成的雙向阻塞隊列。能夠從隊列兩端插入和移除元素。

Fork/Join框架

該框架主要應用在並行計算中,把一個大人物分割成若干個小任務,最終彙總每一個小任務結果後獲得大結果的框架。Fork 就是把一個大任務切分紅若干子任務並行的執行,Join 就是合併這些子任務的執行結果,最終獲得這個大任務的結果。

工做竊取算法

工做竊取是指某個線程從其餘隊列裏竊取任務來執行。一般使用雙端隊列,被竊取任務線程永遠從雙端隊列頭部拿任務執行,竊取任務的線程永遠從雙端隊列尾部拿任務執行。

優勢是充分利用線程進行並行計算,減小了線程間的競爭。缺點是在某些狀況下存在競爭,好比隊列只有一個任務時,會消耗更多的資源。

框架設計思路

首先,分割任務,將一個大任務分割成子任務,不停分割直到分割出的子任務足夠小。

而後,執行任務併合並結果。分割的子任務分別放在雙端隊列,而後幾個啓動線程分別從雙端隊列獲取任務執行。執行結果放在一個隊列裏,啓動一個線程從隊列拿數據,而後合併這些線程。

示例

public class ForkJoinCase extends RecursiveTask<Integer> {
    private final int threshold=5;
    private int first;
    private int last;

    public ForkJoinCase(int first,int last){
        this.first=first;
        this.last=last;
    }


    @Override
    protected Integer compute() {
        int ret=0;
        if(last-first<=threshold){//任務足夠小,執行
            for(int i=first;i<=last;i++){
                ret+=i;
            }
        }else{//分解任務
            int mid=first+(last-first)/2;
            ForkJoinCase leftTask=new ForkJoinCase(first,mid);
            ForkJoinCase rightTask=new ForkJoinCase(mid+1,last);
            //執行子任務
            leftTask.fork();
            rightTask.fork();
            //合併子任務結果
            ret=leftTask.join()+rightTask.join();
        }
        return ret;
    }
}
複製代碼

參考資料

相關文章
相關標籤/搜索