第六章 Java併發容器和框架

ConcurrentHashMap的實現原理與使用

ConcurrentHashMap是線程安全且高效的hashmap。本節讓咱們一塊兒研究一下該容器是如何在保證線程安全的同時又能保證高效的操做。node

爲何要使用ConcurrentHashMap

在併發編程中使用HashMap可能致使程序死循環。而使用線程安全的HashTable效率又很是低下,基於以上兩個緣由,便有了ConcurrentHashMap的登場機會。算法

(1)線程不安全的HashMap編程

在多線程環境下,使用HashMap進行put操做會引發死循環,致使CPU利用率接近100%,因此在併發狀況下不能使用HashMap。例如一下代碼數組

    final HashMap<String, String> map = new HashMap<>(2);
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        map.put(UUID.randomUUID().toString(), "");
                    }
                }, "ftf" + i).start();
            }
        }
    }, "ftf");
    thread.start;
    thread.join;

HashMap在併發執行put操做時會引發死循環,是由於多線程會致使HashMap的Entry鏈表造成環形數據結構,一旦造成環形數據結構,Entry的next節點永遠不會爲空,就會產生死循環獲取Entry.緩存

(2)效率低下的HashTable安全

HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下。由於當一個線程訪問HashTable的同步方法,其餘線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態。數據結構

(3)ConcurrentHashMap的鎖分段技術可有效提高併發訪問率多線程

HashTable容器在競爭激烈的併發環境下表現出效率低下的緣由是全部訪問HashTable的線程都必須競爭同一把鎖,假如容器裏有多把鎖,每一把鎖用於鎖容器的一部分數據,那麼當多線程訪問容器裏不一樣數據段的數據時,線程間就不會存在鎖競爭,從而能夠有效提升併發訪問率,這就是ConcurrentHashMap所使用的鎖分段技術。首先將數據分紅一段一段地存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘數據段也能被其餘線程訪問。併發

 

ConcurrentHashMap的結構

ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖(ReentrantLock),在ConcurrentHashMap裏扮演鎖的角色;HashEntry則用於儲存鍵值對數據。一個ConcurrentHashMap裏包含一個Segment數組。Segment的結構和HashEntry相似,是一種數組和鏈表結構。一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素,每一個Segment守護着一個HashEntry數組裏的元素,當對HashEntry數組的數據進行修改時,必須首先得到與它對於的Segment鎖。app

 

ConcurrentHashMap的初始化

ConcurrentHashMap初始化方法是經過initialCapacity,loadFactor和concurrencyLevel等幾個參數來初始化Segment數組、段偏移量segmentShift、段掩碼segmentMask和每一個segment裏的HashEntry數組來實現的。

初始化segments數組:讓咱們來看一下初始化segments數組的源代碼

if(concurrencyLevel > MAX_SEGMENTS)  concurrencyLevel = MAX_SEGMENTS 
 int sshift = 0;
        int ssize = 1;
        while (ssize < DEFAULT_CONCURRENCY_LEVEL) {
            ++sshift;
            ssize <<= 1;
        }
        int segmentShift = 32 - sshift;
        int segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);

由上面代碼可知,segments數組的長度ssize是經過concurrencyLevel計算得出的。爲了能經過按位與的散列算法來定位segments數組的索引,必須保證segments數組的長度是2的N次方,因此必須計算出一個大於或等於concurrencyLevel的最小的2的N次方值來做爲segments數組的長度。concurrencyLevel的最大值是65535,這意味着segments數組的長度最大爲65536,對應的二進制是16位。

初始化segmentShift和segmentMask:這兩個全局變量須要在定位segment時的散列算法裏使用,sshift等於ssize從1向左移位的次數,在默認狀況下concurrencyLevel等於16,1須要向左移位移動4次,因此sshift等於4。segmentShift用於定位參與散列運算的位數,segmentShift等於32減去sshift,因此等於28,這裏之因此用32是由於ConcurrentHashMap裏的hash()方法輸出的最大數是32位的。segmentMask是散列運算的掩碼,等於ssize減1,即15,掩碼的二進制各個位的值都是1.由於ssize的最大長度是65536,因此segmentShift最大值是16,segmentMask最大值是65535,對應的二進制是16位,每一個位都是1.

初始化每一個segment:輸入參數initalCapacity是ConcurrentHashMap的初始化容量,loadfactor是每一個segment的負載因子,在構造方法裏須要經過兩個參數來初始化數組中的每一個segment。

 

定位segment

既然ConcurrentHashMap使用分段鎖Segment來保護不一樣段的數據,那麼在插入和獲取元素的時候,必須先經過散列算法定位到segment。ConcurrentHashMap會首先使用Wang/Jenkins hash的變種算法對元素的hashCode進行一次再散列。之因此進行再散列,目的是減小散列衝突,使元素可以均勻地分佈在不一樣的Segment上,從而提升容器的存取效率。假如散列的質量差到極點,那麼全部的元素都在一個Segment中,不只存取元素緩慢,分段鎖也會失去意義。

默認狀況下segmentShift爲28,segmentMask爲15,再散列後的數量最大是32位二進制數據,向右無符號移動28位,意思是讓高4位參與到散列運算中,(hash>>>segmentShit)&segmentMask的運算結果分別是4,15,7和8,能夠看到散列值沒有發生衝突。

 

ConcurrentHashMap的操做

get操做:Segment的get操做實現很是簡單和高效。先通過一次再散列,而後使用這個散列值經過散列運算定位到Segment,再經過散列算法定位到元素,代碼以下

public V get(Object key){
 int hash = hash(key.hashCode());
 return segmentFor(hash).get(key,hahsh)   
}

get操做的高效之處在於整個get過程不須要加鎖,除非讀到的值是空纔會加鎖重讀。它的get方法裏將要使用的共享變量都定義成volatile類型,如用於統計當前Segment大小的count字段和用於存儲值的HashEntry的value,定義成volatile的變量,可以在線程之間保持可見性,可以被多線程同時讀,而且保證不會讀到過時的值,可是隻能被單線程寫(有一種狀況能夠被多線程寫,就是寫入的值不依賴與原值),在get操做裏只須要讀不須要寫共享變量count和value,因此能夠不用加鎖。之因此不會讀到過時的值,是由於根據Java內存模型的happen before原則,對volatile字段的寫入操做先於讀操做,即便兩個線程同時修改和獲取volatile變量,get操做也能拿到最新的值,這是用volatiel替換鎖的經典應用場景。

transient volatile int count;
volatile V value;

在定位元素的代碼裏咱們能夠發現,定位HashEntry和定位Segment的散列算法雖然同樣,都與數組的長度減去1再相「與」,可是想「與」的值不同,定位segment使用的是元素的hashcode經過再散列後獲得的值的高位,而定位HashEntry直接使用的是再散列後的值。其目的是避免兩次散列後的值同樣,雖然元素在Segment裏散列開了,可是卻沒有再HashEntry裏散列開。

put操做:因爲put方法裏須要對共享變量進行寫入操做,因此爲了線程安全,在操做共享變量時必須加鎖。put方法首先定位到Segment,而後再Segment裏進行插入操做。插入操做須要經歷兩個步驟,第一步判斷是否須要對Segment裏的HashEntry數組進行擴容,第二步添加元素的位置,而後將其放在HashEntry裏。

(1)是否須要擴容:在插入元素前會先判斷Segment裏的HashEntry數組是否超過容量,若是超過闕值,則對數組進行擴容。值得一提的是,Segment的擴容判斷比HashMap更恰當,由於HashMap是在插入元素後判斷元素是否已經到達容量的,若是達到了就進行擴容,可是頗有可能擴容以後沒有新元素插入,這時HashMap就進行了一次無效的擴容,

(2)如何擴容:在擴容的時候,首先會建立一個容量是原來容量兩倍的數組,而後將原數組裏的元素進行再散列後插入到新的數組裏。爲了高效,ConcurrentHashMap不會對整個容器進行擴容,而只對某個segment進行擴容。

(3)size操做:若是要統計整個ConcurrentHashMap裏元素的大小,就必須統計全部Segment裏元素的大小後求和。Segment裏的全局變量count是一個colatile變量,那麼在多線程場景下,是否是直接把全部的Segment的count相加就能夠獲得整個ConcurrentHashMap大小呢?不是的,雖然相加時能夠獲取每一個Segment的count最新值,可是可能累加前使用count發生了變化,那麼統計結果就不許了。因此,最安全的作法是在統計size的時候把全部Segment的put,remone和clean方法所有鎖住,可是這種作法顯然很是抵消。由於在累加count操做過程當中,以前累加過的count發生變化的概率很是小,因此ConcurrentHashMap的作法是先嚐試2次經過不鎖住Segment的方式來統計各個Segment大小,若是統計的過程當中,容器的count發生了變化,則再採用加鎖的方式來統計全部Segment大小。ConcurrentHashMap如何判斷統計時候發生了變化呢?使用modCount變量,在put,remove和clean方法裏操做元素前都會將變成modCount進行加1,那麼在統計size先後比較modCount是否發生變化,從而得知容器的大小是否發生變化。

 

ConcurrentLinkedQueue

在併發編程中,有時候須要使用線程安全的隊列。若是要實現一個線程安全的隊列有兩種方式:一種是使用阻塞算法,另外一種是使用非阻塞算法。使用阻塞算法的隊列能夠用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用於不一樣的鎖)等方式實現。非阻塞的實現方式則可使用循環CAS的方式實現。

ConcurrentLinkedQueue是一個基於連接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,當咱們添加一個元素的時候,它會添加到隊列的尾部;當咱們獲取一個元素時候,它會返回隊列頭部的元素。它採用了「wait-free」算法(即CAS算法)來實現,概算在Michael&Scoft算法上進行了一些修改。

ConcurrentLinkedQueue的結構

ConcurrentLinkedQueue由head節點和tail節點組成,每一個節點(node)由節點元素(item)和指向下一個節點(next)的引用組成,節點和節點之間就是經過這個next關聯起來,從而組成一張連接結構的隊列。默認狀況下head節點存儲的元素爲空,tail節點等於head節點。

private transient volatile Node<E> tail = head;

入隊列

(1)入隊列的過程

入隊列就是將入隊節點添加到隊列的尾部。爲了方便理解入隊時隊列的變化,以及head節點和tail幾點的變化,這裏以一個示例來展開介紹。假設咱們想在一個隊列依次插入4個節點,每添加一個節點就作了一個隊列的快照圖,以下

  1. 添加元素1。隊列更新head節點的next節點爲元素1節點。又由於tail節點默認狀況下等於head節點,因此它們的next節點都指向元素的1節點
  2. 添加元素2。隊列首先設置元素1節點的next節點爲元素2節點,而後更新tail節點指向元素2節點
  3. 添加元素3。設置tail節點的next節點爲元素3節點
  4. 添加元素4。設置元素3的next幾點爲元素4節點,而後將tail節點指向元素4節點

經過調試入隊過程並觀察head節點和tail節點的變化,發現入隊主要作兩件事情:

第一是將入隊節點設置成當前隊列尾節點的下一個節點;

第二是更新tail節點,若是tail節點的next節點不爲空,則將入隊節點設置成tail節點,反之,則將入隊及誒單設置成tail節點的next節點,因此節點不老是尾節點。

若是是多線程進入入隊,咱們經過源碼來詳細分析它是如何使用CAS算法來入隊的

 /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

從源代碼角度來看,整個入隊過程主要作兩件事情:

第一是定位出尾節點;

第二是使用CAS算法將入隊節點設置成尾節點的next節點,如不成功則重試

(2)定位尾節點

tail節點並不老是尾節點,因此每次入隊都必須先經過tail節點來找到尾節點,尾節點多是tail節點,也多是tail節點的next節點,代碼中循環體中的第一個if就是判斷tail是否有next節點,有則表示next節點的多是尾節點。獲取tail節點的next節點須要注意的是p節點等於p的next節點狀況,只有一種可能就是p節點和p的next節點都等於空,表示這個隊列剛剛初始化,正準備添加節點,因此須要返回head節點,獲取p節點的next節點代碼以下

   final Node<E> succ(Node<E> p) {
        Node<E> next = p.next;
        return (p == next) ? head : next;
    }

(3)設置入隊節點爲尾節點

p.casNext(null, n)方法用於將入隊及誒單設置爲當前隊列尾尾節點的next節點,若是p是null,表示p是當前隊列的尾節點,若是不爲null,表示有其餘線程更新了尾節點,則須要從新獲取當前隊列的尾節點

(4)hops的設計意圖

上面分析過對於先進先出的隊列入隊所要作的事情是將入隊節點設置成尾節點,dong lea寫的代碼和邏輯仍是稍微有點複雜,那麼咱們用一下方式來實現是否可行?

    public boolean offer(E e){
        if(e == null) throw new NullPointerException();
        Node<E> n = new Node<E>(e);
        for(;;){
            Node<E> t = tail;
            if(t.casNext(null, n) && casTail(t, n)) return true;
        }
    }

讓tail節點永遠做爲隊列的尾節點,這樣實現代碼了很是少,並且邏輯清晰和易懂。可是,這麼作有一個缺點,每次都須要使用循環CAS更新tail節點。若是能減小CAS更新tail節點的次數,就能提升入隊的效率,全部dong lea使用hops變量來控制並減小tail節點的更新頻率,並非每次節點入隊後都將tail節點更新成尾節點,而是當tail節點和尾節點的距離大於等於常量HOPS的值(默認等於1)時才更新tail節點,tail和尾節點的距離越長,使用CAS更新tail節點的次數就會越少,可是距離越長帶來的負面效果就是每次入隊時定位尾節點的時間就越長,由於循環體須要多循環一次來定位出尾節點,可是這樣仍然能提升入隊的效率,由於從本質上來看它經過增長對volatile變量的讀操做來減小對volatile變量的寫操做,而對volatile變量的寫操做開銷要遠遠大於讀操做,因此入隊效率會有所提高。

注意:入隊方法永遠返回true,因此不要經過返回值判斷入隊是否成功

出隊列

出隊列的就是從隊列裏返回一個節點元素,並清空該節點對元素的引用。每一個節點出隊的快照以下

由圖可知,並非每次出隊時都更新head節點,當head節點裏有元素時,直彈出head節點裏的元素,而不會更新head節點。只有當head節點裏沒有元素時,出隊操做纔會更新head節點。這種作法也是經過hops變量來減小使用CAS更新head節點的消耗,從而提升出隊效率。

 

Java中的阻塞隊列

什麼是阻塞隊列

阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列,這兩個附加的操做支持阻塞的插入和移除方法。

(1)支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿

(2)支持阻塞的移除方法:意思是在隊列爲空時,獲取元素的線程會等待隊列變爲非空

阻塞隊列經常使用語生產者和消費者的場景,生產者向隊裏裏添加元素,消費者是從隊列裏取元素的線程。阻塞隊列就是生產者存放元素,消費者獲取元素的容器。

在阻塞隊列不可用時,這兩個附加操做提供了4種處理方式

方法/處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove() poll() take() poll(time, unit)
檢查方法 element() peek() 不可用 不可用

拋出異常:當隊列滿時,若是再插入元素,會拋出IllegalStateException異常。當隊列空時,從隊列裏獲取元素會拋出NoSuchElementException異常

返回特殊值:當往隊列插入元素時,會返回元素是否插入成功,成功返回true。若是是移除方法,則是從隊列裏取出一個元素,若是沒有則返回null

一直阻塞:當阻塞隊列滿時,若是生產者往隊列裏put元素,隊列會一直阻塞生產者,知道隊裏可用或者響應中斷退出。當隊列爲空時,若是消費者從隊列裏take元素,隊列會阻塞消費者線程,直到隊列不爲空

超時退出:當阻塞隊列滿時,若是生產者往隊裏裏插入元素,隊列會阻塞生產者線程一段時間,若是超過了指定的時間,生產者線程就會退出

注意:若是是無界阻塞隊列,隊裏不可能會出現滿的狀況,因此使用put或offer方法永遠不會被阻塞,並且使用offer方法時,該方法永遠返回true

Java裏的阻塞隊列

JDK提供了7種阻塞隊列:

  1. ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列
  2. LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列
  3. PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列
  4. DelayQueue:一個使用優先級隊列實現的無界阻塞隊列
  5. SynchronousQueue:一個不存儲元素的阻塞隊列
  6. LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列
  7. LingkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列

ArrayBlockingQueue

ArrayBlockingQueue是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。默認狀況下不保證線程公平的訪問隊列。

LinkedBlockingQueue

LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度爲Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序

PriorityBlockingQueue

PriorityBlockingQueue是一個支持優先級的無界阻塞隊列。默認狀況下元素採起天然順序升序排列。也能夠自定義類實現cpmpareTo()方法來指定元素排序規則,或者初始化PriorityBlockingQueue時,指定構造參數Comparator來對元素進行排序,須要注意的是不能保證同優先級元素的順序

DelayQueue

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素

DelayQueue很是有用,能夠運用在如下場景:

  • 緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了
  • 定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,好比TimeQueue就是使用DelayQueue實現的

 SynchronousQueue

SynchronousQueue是一個不存儲元素的阻塞隊列。每個put操做必須等待一個take操做,不然不能繼續添加元素。

它支持公平訪問隊列,默認狀況下現場採用非公平性策略訪問隊列。使用如下構造方法能夠建立公平性訪問SynchronousQueue,若是設置成true,則等待的線程會採用先進先出的順序訪問隊列。SynchronousQueue能夠當作是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列自己並不存儲任何元素,很是適合傳遞性建立。SynchronousQueue的吞吐量高於LinkedBlockingQueue和ArrayBlockingQueue

LinkedTransferQueue

LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其餘阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法

(1)tryTransfer

tryTransfer方法是用來試探生產者傳入的元素是否能直接傳給消費者。若是沒有消費者等待接受元素,則返回false。和transfer方法的區別是tryTransfer方法不管消費者是否接受,方法當即返回,而transfer方法是必須等到消費者消費了才返回。

對於帶有時間限制的tryTransfer(E e,long timeout, TimeUnit unit)方法,試圖把生產者傳入的元素直接傳給消費者,可是若是沒有消費者消費該元素則等待知道的時間再返回,若是超時尚未消費元素則返回false,反之返回true

(2)transfer

若是當前有消費者正在等待接受元素(消費者使用take()方法或帶時間限制的poll()方法),transfer方法能夠把生產者傳入的元素馬上transfer(傳輸)給消費者。若是沒有消費者在等待接受元素,transfer方法將會存放在隊列的tail節點,並等到該元素被消費者消費了才返回。

LingkedBlockingDeque

LingkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的是能夠從隊列的兩端插入和移出元素,雙向隊列由於多了一個操做隊列的入口,在多線程同時入隊時,也就減小了一半的競爭。相比其餘阻塞隊列,LingkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst和peekLast等方法,以First單詞結尾的方法,表示插入。獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入、獲取或移除雙端隊列的最後一個元素。另外,插入方法add等同於addLast,移除方法remove等效於removeFirst。可是take方法卻等同於takeFirst,不知道是否是JDK的bug,使用時仍是用帶有First和Last後綴的方法更清楚。

在初始化LingkedBlockingDeque時能夠設置容量防止其過分膨脹。另外,雙向阻塞隊列能夠運用在「工做竊取」模式中

 

阻塞隊列的實現原理

若是隊列是空的,消費者會一直等待,當生產者添加元素時,消費者是如何知道當前隊列有元素的呢?

使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。經過查看JDK源碼發現ArrayBlockingQueue使用了Condition來實現

  /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }


    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }


    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }


    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

 

Fork/Join框架

什麼是Fork/Join框架

Fork/Join框架是Java7提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。

工做竊取算法

工做竊取算法(work-stealing)是指某個線程從其餘隊列裏竊取任務來執行。

優勢:充分利用線程進行並行計算,減小了線程間的競爭

缺點:在某些狀況下仍是存在競爭,好比雙端隊列裏只有一個任務時。而且該算法會消耗了更多的系統資源,好比建立多個線程和多個雙端隊列

 Fork/Join框架的設計

 步驟1:分割任務。首先咱們須要有一個fork類來把大任務分割成子任務,有可能子任務仍是很大,因此還須要不停地分割,直到分割出的子任務足夠小

步驟2:執行任務併合並結果。分割的子任務分別放在雙端隊列裏,而後幾個啓動線程分別從雙端隊列裏獲取任務執行。子任務執行完的結果都統一放在一個隊列裏,啓動一個線程從隊列裏拿數據,而後合併這些數據

Fork/Join使用兩個類來完成以上兩個事情。

  1. ForkJoinTask:咱們要使用Fork/Join框架,必須首先建立一個Fork/Join任務。它提供在任務中執行fork()和join()操做的機制。一般狀況下,咱們不須要直接繼承ForkJoinTask類,只須要繼承它的子類,Fork/Join框架提供瞭如下兩個子類
    1. RecursiveAction:用於沒有返回結果的任務
    2. RecursiveTask:用於有返回結果的任務
  2. ForkJoinPool:ForkJoinTask須要經過ForkJoinPool來執行

任務分割出的子任務會添加到當前工做線程所維護的雙端隊列中,進入隊裏的頭部。當一個工做線程的隊列裏暫時沒有任務時,它會隨機從其餘工做線程的隊列的尾部獲取一個任務。

 Fork/Join框架的異常處理

ForkJoinTask在執行的時候可能會拋出異常,可是咱們沒辦法在主線程裏直接捕獲異常,全部ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,而且能夠經過ForkJoinTask的getException方法獲取異常。getException方法返回Throwable對方,若是任務被取消了則返回CancellationException。若是任務沒有完成或者沒有拋出異常則返回null

 Fork/Join框架的實現原理

ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責將存放程序提交給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務

(1)ForkJoinTask的fork方法實現原理

當咱們調用ForkJoinTask的fork方法時,程序會調用ForkJoinWorkerThread的pushTask方法異步地執行這個任務,而後當即返回結果。pushTask方法把當前任務存放在ForkJoinTask數組隊列裏。而後再調用ForkJoinPool的signalWork()方法喚醒或建立一個工做線程來執行任務。

(2)ForkJoinTask的join方法實現原理

join方法的主要做用是阻塞當前線程並等待獲取結果。首先它調用了doJoin()方法,經過doJoin()方法獲得當前任務的狀態來判斷返回什麼結果,任務狀態有4種:已完成(NORMAL)、被取消(CANCELLED)、信號(SIGNAL)和出現異常(EXCEPTIONAL)

  • 若是任務狀態是已完成,則直接返回任務結果
  • 若是任務狀態是被取消,則直接拋出CancellationException
  • 若是任務時拋出異常,則直接拋出對應異常

在doJoin()方法裏,首先經過查看任務的狀態,看任務是否已經執行完成,若是執行完成,則直接返回任務狀態;若是沒有執行完,則從任務數組裏取出任務並執行。若是任務順利執行完成,則設置任務狀態爲NORMAL,若是出現異常,則記錄異常。並將任務狀態設置爲EXCEPTIONAL

使用 Fork/Join框架

 讓咱們經過一個簡單的需求來使用Fork/Join框架,需求是:計算1+2+3+4的結果

使用Fork/Join框架首先要考慮到時如何分割任務,若是但願每一個子任務最多執行兩個數相加,那麼咱們設置分割的闕值是2,因爲是4個數字相加,因此Fork/Join框架會把這個任務fork成兩個子任務,子任務一負責計算1+2,子任務而負責3+4,而後join兩個子任務的結果。由於是有結果的任務,因此必須繼承ResursiveTask,實現代碼以下:

public class CountTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 2;//闕值
    private int start;
    private int end;

    public CountTask(int start, int end){
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        //若是任務足夠小就計算任務
        boolean canCompute = (end - start) <= THRESHOLD;
        if(canCompute){
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        }else {
            //若是任務大於闕值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            //執行子任務
            leftTask.fork();
            rightTask.fork();
            //等待子任務執行完,並獲得其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            //合併子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //生成一個計算任務,負責計算1+2+3+4
        CountTask countTask = new CountTask(1, 4);
        //執行一個任務
        Future<Integer> result = forkJoinPool.submit(countTask);
        try {
            System.out.println(result.get());
        }catch (InterruptedException e){
            
        }catch (ExecutionException e){
            
        }
    }
}

 

經過這個例子,咱們進一步理解ForkJoinTask,ForkJoinTask與通常任務的主要區別在於它須要實現compute方法,在這個方法裏,首先須要判斷任務是否足夠小,若是足夠小就直接執行任務。反之就必須分割成兩個子任務,每一個子任務在調用fork方法時,又會進入compute方法,看着當前子任務是否須要繼續分割成子任務,若是不須要繼承分割,則執行當前子任務並返回結果。使用join方法會等待子任務執行完並獲得其結果。

相關文章
相關標籤/搜索