ConcurrentHashMapnode
在多線程環境下,使用HashMap進行put操做會引發死循環,致使CPU利用率近100%。由於多線程會致使HashMap的Entry鏈表造成環形數據結構,一旦造成環形數據結構,Entry的next節點永遠不爲空,會死循環的獲取Entry。linux
final HashMap<String, String> map = new HashMap<String, String>(2); Thread t = new Thread(new Runnable(){ public void run(){ for(int i = 0; i < 1000; i++){ new Thread(new Runnable(){ public void run(){ map.put(UUID.randomUUID().toString(), ""); } }, "ftf" + i).start(); } } }, "ftf"); t.start(); t.join();
HashTable使用synchronized保證線程安全,但在線程競爭激烈的狀況下HashTable的效率低下。當一個線程訪問HashTable的同步方法時,其餘線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態訪問。HashTable在多個線程訪問的時須要競爭同一把鎖,所以效率低下。ConcurrencyHashMap採用鎖分段技術。先將數據分紅一段一段地存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個數據的時候,其餘段的數據也能被其餘線程訪問。算法
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖(ReentrantLock),在ConcurrencyHashMap中扮演鎖的角色;HashEntry則用於存儲鍵值對數據。一個ConcurrentHashMap裏包含一個Segment數組。Segment是一種數組和鏈表結構。一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素,每一個Segment守護着一個HasnEntry數組裏的元素,黨對HashEntry數組的數據進行修改時,必須先獲取與它對應的Segment的鎖。數組
ConcurrentHashMap的初始化方法是經過initialCapacity,loadFactor和concurrencyLevel等幾個參數來初始化segment數組,段偏移量segmentShifr,段掩碼segmentMask和每一個segment裏的HashEntry數組來實現的。緩存
if(concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0; int ssize = 1; while(ssize < concurrencyLevel){ ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize);
segments數組的長度ssize是經過concurrencyLevel計算得出的。爲了能經過按位與的散列算法來定位segments數組的索引,必須保證segments數組的長度是2的N次方(power-of-two size),因此必須計算出一個大於或等於concurrencyLevel的最小的2的N次方值來做爲segments數組的長度。concurrencyLevel的最大值是65535,這意味着segments數組的長度最大爲65536,對應的二進制是16位。安全
segmentShift和segmentMask須要在定位segment時的散列算法裏使用。sshift等於ssize從1向左移位的次數,在默認狀況下concurrencyLevel等於16,1須要向左移動4次,因此sshift爲4。segmentShift用於定位參與散列運算的位數,segmentShift等於32(ConcurrentHashMao的hash*(輸出的最大數是32位)減sshift,即2。segmentMask是散列運算的掩碼,等於ssize減1,即15。掩碼的二進制各個位的值都是1.由於ssize的最大長度是65536,因此segmentShift最大值是16,segmentMask最大值是65535,對應的二進制是16位,每一個位都是1。數據結構
初始化每一個segment多線程
輸入參數intialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每一個segment的負載因子,在構造方法裏須要經過這兩個參數來初始化數組中的每一個segment。app
if(initialCapacity > MAXIMUM_CAPACITY){ initialCapacity = MAXIMUM_CAPACITY; } int c = initialCapacity / ssize; if(c * ssize < initialCapacity) ++c; int cap = 1; //segment中的HashEntry數組的長度 while(cap < c) cap <<= 1; for(int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K, V>(cap, loadFactor); //segment的容量threshold=(int)cap*loadFactor。默認狀況下,initialCapacity是16,loadFactor是0.75,cap爲1,threshod爲0
定位Segment框架
ConcurrentHashMap會使用Wang/Jenkins hash的變種算法對元素的hashCode進行一次再散列。再進行散列的目的是減小散列衝突,使元素可以均勻地分佈在不一樣的Segment上,從而提升容器的存取效率、
private static int has(int h){ h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }
ConcurrentHashMao經過散列算法定位segment。默認狀況下,segmentShift爲28,segmentMask爲15,在散列後的數最大是32位的二進制數據,向右無符號移動28位(讓高4位參加到散列運算中),
final Segment<K, V> segmentFor(int hash){ return [segments(hash >>> segmentShift) & segmentMask]; }
ConcurrentHashMap的操做
get():先通過一次散列,而後在使用這個散列值經過算咧運算定位到Segment,再經過散列算法定位到元素。get整個過程都不須要加鎖,除非讀到的值是空纔會加鎖重讀。get方法裏將要使用的共享變量都定義成了volatile類型。由於Java內存模型的happen before原則,對volatile字段的寫入操做會優先於讀操做,即便兩個線程同時修改和獲取volatile變量,get操做也能夠拿到最新值。定位HashEntry和定位的Segment的散列算法雖然都是與數組的長度減去1再相"與",定位Segment使用的是元素的hashcode經過再散列後獲得的值的高位,而定位HashEntry直接使用過的是散列後的值。
public V get(Object key){ int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); }
put():put須要對共享變量進行寫入操做,因此在操做共享變量是必須加鎖。put先定位到Segment,而後在Segment裏面進行插入操做。插入操做經歷兩個步驟,第一步判斷是否須要對Segment裏的HashEntry數組進行擴容,第二部定位添加元素的位置,而後將其放在HashEntry數組裏面。
是否須要擴容:在插入元素前會先判斷Segment裏的HashEntry數組是否超過容量(threshold),若超過閾值,則對數組進行擴容。Segment的擴容判斷比HashMap更恰當,若HashMap在插入元素後判斷元素已經達到容量後再進行擴容,以後若沒有新元素插入,則HashMap就進行了無效的擴容
如何擴容:在擴容時,先建立一個容量是原來容量二倍的數組,而後將原數組裏的元素進行再散列後插入到新的數組中,ConcurrentHashMap只會對某個segment進行擴容。
size():統計整個ConcurrentHashMap裏元素的大小。ConcuurentHashMap先嚐試2次經過不鎖住Segment的方式來統計各個Segment大小。若統計過程當中容器的count發生了變化(使用modCount變量,在put,remove和clean方法元素操做前將變量modCount加1,在統計size先後比較modCount是否發生變化),則再採用加鎖的方式來統計全部Segment的大小。
實現一個線程安全的隊列有兩種方式:一種是使用阻塞算法,另外一種是使用非阻塞算法。使用阻塞算法的隊列能夠用一個鎖(入隊出隊同一把鎖)或兩個鎖(入隊和出隊用不一樣的鎖)等方式起來實現。非阻塞的實現方式則可使用循環CAS的方式來實現
ConcurrentLinkedQueue
ConcurrentLinkedQueue是一個基於連接節點的無界線程安全隊列。它採用先進先出的規則對節點進行排序,當咱們添加一個元素的時候,它會添加到隊列的尾部;當咱們獲取一個元素時,它會返回隊列頭部的元素。它採用了「wait-free「算法來實現。ConcurrentLinkedQueue由head節點和tail節點組成,每一個節點(Node)由節點元素(item)和指向下一個節點(next)的引用組成,該節點與節點之間就是經過next關聯起來,從而組成一張鏈表結構的隊列。默認狀況下head節點存儲的元素爲空,tail節點等於head節點。
入隊列
入隊列就是將入隊節點添加到隊列的尾部。入隊主要作兩件事:第一是將入隊節點設置成當前隊列尾節點的下一個節點;第二是更新tail節點,若tail節點的next節點不爲空,則將入隊節點設置成tail節點,若tail節點的next節點爲空,則將入隊節點設置城tail的next節點,因此tail節點不老是尾節點。
public boolean offer(E e){ if(e == null) throw new NullPointerException(); Node<E> n = new Node<E>(e); retry:
// 入隊不成功反覆入隊 for(;;){ Node<E> t = tail; Node<E> p = t; for(int hops = 0; ; hops++){
// 獲取p的下一個節點 Node<E> next = succ(p); if(next != null){ if(hops > HOPS && t != tail) continue retry; p = next; }else if(p.casNext(null, n)){
//若tail節點有大於等於1個next節點,則將入隊節點設置成tail節點。 if(hops >= HOPS)
casTail(t, n); return true; //永遠返回true,不要經過返回值判斷入隊是否成功 }else{ p = succ(p); } } } }
定位尾節點
tail節點並不是老是尾節點。尾節點多是tail節點,也多是tail節點的next節點。
final Node<E> succ(Node<E> p){ Node<E> next = p.getNext(); return (p==next) head : next; }
設置入隊節點爲尾節點
p.casNext(null, n)方法將入隊節點設置爲當前隊尾節點的next節點,若p是null則表示p是當前隊列的尾節點,若不爲null,則表示有其餘線程更新了尾節點,須要從新獲取當前隊列的尾節點
HOPS
doug lea使用hops變量來控制並減小tail節點的更新頻率,並非每次節點入隊後都將tail節點更新爲尾節點,而是當tail節點和尾節點的距離大於等於常量HOPS的值時才更新tail節點,tail和尾節點的距離越長,使用CAS更新tail節點的次數就會越少,但每次入隊時定位尾節點的時間久越長
出隊列
出隊列就是從隊列裏返回一個節點元素,並清空該節點對元素的引用。當head節點裏有元素時,直接彈出head節點裏的元素,而不會更新head節點。當head節點沒有元素時,出隊操做纔會更新head節點。
public E poll(){ Node<E> h = head; Node<E> p = h; for(int hops = 0; ; hops++){
// 獲取p節點元素 E item = p.getItem();
// 若p節點的元素不爲空,使用CAS設置p幾點引用的元素爲null,成功則返回p if(item != null && p.casItem(item, null)){ if(hops >= HOPS){ Node<E> q = p.getNext(); updateHead(h, (q != null) q : p); } return item; }
//若頭節點的元素爲空或頭節點發生了變化,則頭節點已被另外一個線程改了,此時選取p的下一個節點 Node<E> next = succ(p);
// 若p的下一個節點也空,則說明隊列已空 if(next == null){ updateHead(h, p);
break; }
// 若一個元素不爲空,則將頭節點的下一個節點設置成頭節點 p = next; } return null; }
阻塞隊列
阻塞隊列是一個支持兩個附件操做的隊列。這兩個附加的操做支持阻塞的插入和移除方法。
支持阻塞的插入方法:當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿時
支持阻塞的移除方法:在隊列爲空時,獲取元素的線程會等待隊列變爲非空
阻塞隊列經常使用於生產者進而消費者的場景,生產者是向隊列裏添加元素的線程,消費者是從隊列裏取元素的線程。阻塞隊列就是生產者用來存放元素,消費者用來獲取元素的容器。
插入和移除操做的4種處理方式
方法/處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
插入方法 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除方法 | remove() | poll() | take() | poll(time, unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
拋出異常:當隊列滿時,若再往隊列裏插入元素,會拋出IllegalStateException("Queuefull")異常。當隊列空時,從隊列裏獲取元素會拋出NoSuchElementException異常。
返回特殊值:當往隊列插入元素時,會返回元素是否插入成功,成功返回true。若移除方法,則是從隊列裏取出一個元素,不然返回null。
一直阻塞:當阻塞隊列滿時,若生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到隊列可用或響應中斷退出。當隊列空時,若消費者線程從隊列裏take元素,隊列會阻塞住消費者線程,直到隊列不爲空。
超時退出:當阻塞隊列滿時,若生產者線程往隊列裏插入元素,隊列會阻塞生產者線程一段時間。若超過指定時間,生產者線程就會退出。
JDK提供7個阻塞隊列
ArrayBlockingQueue:由數組結構組成的有界阻塞隊列
LinkedBlockingQueue:由鏈表結構組成的有界阻塞隊列
PriorityBlockingQueue:支持優先級排序的無界阻塞隊列
DelayQueue:使用優先級隊列實現的無界阻塞隊列
SychronousQueue:不存儲元素的阻塞隊列
LinkedTransferQueue:鏈表結構組成的無界阻塞隊列
LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列
公平訪問隊列是指阻塞的線程,能夠按照阻塞的前後順序訪問隊列,即先阻塞線程先訪問隊列。非公平性是對等待的線程是非公平的,當隊列可用時,阻塞的線程均可以爭奪訪問隊列的資格,有可能縣阻塞的線程最後才訪問隊列。
ArrayBlockingQueue
ArrayBlockingQueue按照先進先出的原則對元素進行排序。默認狀況下不保證線程公平的訪問隊列
public ArrayBlockingQueue(int capacity, boolean fair){ if(capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
LinkedBlockingQueue
LinkeBlockingQueue的默認的和最大長度爲Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。
PriorityBlockingQueue
PriorityBlockingQueue是一個支持優先級的無界阻塞隊列。默認狀況下元素採起天然順序升序排序。也能夠自定義類實現compareTo()方法來指定元素排序規則,或初始化PriorityBlockingQueue時指定構造參數Comparator來對元素進行排序。不能保證同優先級元素的順序。
DelayQueue
DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從對了了中提取元素。
應用場景以下:
緩存系統的設計:可使用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了
定時任務調度:使用DelayQueue保存當前將會執行的任務和執行時間,一旦從DelayQueue中獲取到到任務就開始執行。
實現Delayed接口:
在建立對象時,初始化基本數據。使用time記錄當前對象延遲到何時可使用,使用sequenceNumber來標識元素在隊列中的前後順序。
private static final AtomicLong sequencer = new AtomicLong(0); ScheduledFutureTask(Runnable r, V result, long ns, long period){ super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); }
實現getDelay方法。該方法返回當前元素還須要延時多長時間,單位是納秒。
public void getDelay(TimeUnit unit){ return unit.convert(time - now(), TimeUnit.NANOSECONDS); }
實現compareTo方法來指定元素的順序。
public int compareTo(Delayed other){ if(other == this) return 0; if(other instanceof ScheduledFutureTask){ ScheduledFutureTask<> x = (ScheduledFutureTask<>) other; long diff = time - x.time; if(diff < 0) return -1; else if(diff > 0) return 1; else if(sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) 0 : ((d < 0) -1 : 1); }
延時阻塞隊列的實現
當消費者從隊列裏獲取元素時,若元素沒有達到延時時間,就阻塞當前線程
long delay = first.getDelay(TimeUnit.NANOSECONDS); if(delay <= 0) return q.poll(); else if(leader != null) // leader是一個等待獲取隊列頭部元素的線程。若leader不等於空,表示已經有線程在等待獲取隊列的頭元素 available.await(); else{ Thread thisThread = Thread.currentThread(); leader = thisThread; try{ available.awaitNanos(delay); }finally{ if(leader == thisThread) leader = null; } }
SynchronousQueue
SynchronousQueue的每個put操做必須等待一個take操做,不然不能繼續添加元素。它支持公平訪問隊列。默認狀況下線程採用非公平性策略訪問隊列
public SynchronousQueue(boolean fair){ transferer = fair ? new TransferQueue() : new TransferStack(); }
SynchronousQueue負責把生產者線程處理的數據直接傳遞給消費者線程。隊列自己並不存儲任何元素。
LinkedTransferQueue
transfer():當前有消費者正在等待接收元素(消費者使用take()或帶有時間限制的poll()),transfer方法能夠把生產者傳入的元素馬上transfer給消費者。若沒有消費者在等待接收元素,transfer方法將元素存放在隊列的tail節點,並等待該元素被消費者消費了才返回。
Node pre = tryAppend(s, haveData); //將當前節點做爲tail節點 return awaitMatch(s, pred, e, (how == TIMED), nanos); //讓CPU自旋等待消費者消費元素
tryTransfer():用來試探生產者傳入的元素是否能直接傳給消費者。若沒有消費者接受元素,則返回false。tryTransfer方法不管消費者是否接受都馬上返回,而transfer方法必須等到消費者消費後才返回。tryTransfer(E e, long timeout, TimeUnit unit)方法試圖把生產者傳入的元素直接傳給消費者,若沒有消費者消費該元素則等待指定的時間再返回,若超時還沒消費元素,則返回false,若在超時時間內消費了元素,則返回true。
LinkedBlockingDeque
LinkedBlockingDeque是由鏈表組成的雙向阻塞隊列。雙向隊列指的是能夠從隊列的兩端插入和移除元素。雙向隊列由於多了一個操做對了的入口,在多線程同時入隊時,減小了一半的競爭。相對於其餘阻塞隊列,LinkedBlockingDeque多了addFirst,addList,offerFirst,offerLast,peekFirst和peekLast等方法。
阻塞隊列的實現原理
使用通知模式的實現
通知模式就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個對了中的元素後,會通知生產者當前隊列可用。
private final Condition notFull; private finla Condition notEmpty; public ArrayBlockingQueue(int capacity, boolean fair){ notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException{ checkNotFull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try{ while(count == items.length) notFull.await(); insert(e); }finally{ lock.unlock(); } } public E take() throws InterruptedException(){ final ReentrantLock lock = this.lock; lock.lokcInterruptibly(); try{ while(count == 0) notEmpty.await(); return extract(); }finally{ lock.unlock(); } } private void insert(E x){ items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
當往隊列裏插入一個元素時,若對了不可用,那麼阻塞生產者主要經過LockSupport.park(this)來實現
pubilc final void await() throws InterruptedException{ if(Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullRelease(node); int interruptMode = 0; while(!isOnSyncQueue(node)){ LockSupport.park(this); if((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if(acquireQueued(node, savedState) && interrutMode != THROW_IE) interruptMode = REINTERRUPT; if(node.nextWaiter != null) unlinkCancelledWaiters(); if(interruptMode != 0) reportInterruptAfterWait(interruptMode); }
public static void park(Object blocker){ Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); }
park會阻塞當前線程,只有一下4種狀況中的一種發生時,該方法纔會返回:
與park對應的unpark執行或已經執行時。「已經執行」是指unpark先執行,而後在執行park的狀況
線程被中斷時
等待完time參數指定的毫秒數時
異常現象發生時
JVM在linux下實現park的方式是使用系統方法pthread_cond_wait。pthread_cond_wait是一個多線程的條件變量函數。這個方法接受兩個參數:一個共享變量_cond,一個互斥量_mutex,unpark是使用pthread_cond_signal實現的。
Fork/Join
Fork/Join是一個把大人物分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。
工做竊取(work-stealing)算法
工做竊取算法是指某個線程從其餘隊列裏竊取任務來執行。
當把一個當任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,把這些子任務分別放到不一樣的隊列裏,並未每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應。有的線程會先幹完本身的任務,此時其餘線程對應的隊列裏還有任務等待處理。因而該線程就去其餘線程的隊列裏竊取一個任務來作。此時他們會訪問同一個隊列,爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從隊列的頭部拿任務執行,竊取任務的線程從雙端隊列的尾部任務執行。
優勢:充分利用線程進行並行計算,減小了線程間的競爭
缺點:在雙端隊列裏只有一個任務時會存在競爭,而且該算法會消耗了更多的系統資源。
設計
分割任務:須要一個fork類來把大任務分割成子任務,須要不停地分割直到分割出的子任務足夠小。
執行任務併合並結果:分割的子任務分別放在雙端隊列裏,而後幾個啓動線程分別從雙端隊列裏獲取任務執行。子任務執行完的結果都統一放在一個隊列裏,啓動一個線程從隊列裏拿數據,而後合併這些數據
Fork/Join使用兩個類來完成上述設計:
ForkJoinTask:須要使用ForkJoin框架,必須先建立一個ForkJoin任務。它提供在任務中執行fork()和join()操做的機制。一般狀況下,咱們不須要直接繼承ForkJoinTask,只須要繼承它的子類,Fork/Join只提供了兩個子類:
RecursiveAction:用於沒有返回結果的任務
RecursiveTask:用於有返回結果的任務
ForkJoinPool:ForkJoinTask須要經過ForkJoinPool來執行
任務分割出的子任務會添加到當前工做線程所維護的雙端隊列中,進入隊列的頭部。當一個工做線程的隊列裏暫時沒有任務時,它會隨機從其餘線程的隊列的尾部獲取一個任務。
public class CountTask extends RecursiveTask<Integer>{ private static final int THRESHOLD = 2; private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRESHOLD; if(canCompute){ for(int i = start; i <= end; i++) sum += i; }else{ int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); leftTask.fork(); // 每一個子任務調用fork方法時又會進入compute方法,看當前子任務是否有必要分割成子任務,若不須要則執行當前任務,不然繼續分割。 rightTask.fork(); int leftResult = leftTask.join(); int rightResult = rightTask.join(); sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(1, 4); Future<Integer> result = forkJoinPool.submit(task); try { System.out.println(result.get()); } catch (Exception e) { } } }
ForkJoinTask在執行的時候可能會拋出異常,可是沒辦法在主線程中捕獲異常。ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已被取消。而且能夠經過ForkJoinTask的getException方法獲取異常。getException方法返回Throwable對象,若任務被取消了則返回CancellationException。若任務沒有完成或被拋出,則返回null。
ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責將程序提交給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務。
ForkJoinTask的fork方法實現原理
當咱們調用ForkJoinTask的fork方法時,程序會調用ForkJoinWorkerThread的pushTask方法異步地執行這個任務,而後當即返回結果。
public final ForkJoinTask<V> fork(){ ((ForkJoinWorkerThread) Thread.currentThread()).pushTask(this); return this; }
push方法把當前任務存放在ForkJoinTask數組隊列裏。而後再調用ForkJoinPool的signalWork()方法喚醒或建立一個工做線程來執行。
final void pushTask(ForkJoinTask<> t){ ForkJoinTask<>[] q; int s, m; if((q = queue) != null){ long u = (((s = queueTop) & (m = q.length -1)) << ASHIFT) + ABASE; UNSAFE.putOrderedObject(q, u, t); queueTop = s + 1; if((s -= queueBase) <= 2) pool.signalWork(); else if(s == m) growQueue(); } }
ForkJoinTask的join方法實現原理
Join方法的主要做用是阻塞當前線程並等待獲取結果。
public final V join(){ jf(doJoin() != NORMAL) return reportResult(); else return getRawResult(); } private V reportResult(){ int s; Throwable ex; if((s = status) == CANCELLED) throw new CancellationException(); if(s == EXCEPTIONAL && (ex = getThrowableException()) != null) UNSAFE.throwException(ex); return getRawResult(); }
調用doJoin()方法獲得當前任務的狀態來判斷返回什麼結果。任務狀態由4種:已完成(NORMAL),被取消(CANCELLED),信號(SIGNAL)和出現異常(EXCEPTIONAL)。
若任務狀態是已完成,則直接返回任務結果
若任務狀態是被取消,則直接拋出CancellationException
若任務狀態是拋出異常,則直接拋出對應的異常
private int doJoin(){ Thread t; ForkJoinWorkerThread w; int s; boolean completed; if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread){ if((s = status) < 0) //查看狀態,若任務完成,則直接返回任務狀態 return s; if((w = (ForkJoinWorkerThread)t).unpushTask(this)){ try{ completed = exec(); }catch(Throwable ex){ return setExceptionalCompletion(rex); } if(completed) return setCompletion(NORMAL); } return w.joinTask(this); }else return externalAwaitDone(); }