Java併發容器和框架

 

ConcurrentHashMapnode

  在多線程環境下,使用HashMap進行put操做會引發死循環,致使CPU利用率近100%。由於多線程會致使HashMap的Entry鏈表造成環形數據結構,一旦造成環形數據結構,Entry的next節點永遠不爲空,會死循環的獲取Entry。linux

final HashMap<String, String> map = new HashMap<String, String>(2);
Thread t = new Thread(new Runnable(){
  public void run(){ 
    for(int i = 0; i < 1000; i++){
      new Thread(new Runnable(){
        public void run(){
          map.put(UUID.randomUUID().toString(), "");
        }
      }, "ftf" + i).start();
    }
  }
}, "ftf");
t.start();
t.join();

  HashTable使用synchronized保證線程安全,但在線程競爭激烈的狀況下HashTable的效率低下。當一個線程訪問HashTable的同步方法時,其餘線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態訪問。HashTable在多個線程訪問的時須要競爭同一把鎖,所以效率低下。ConcurrencyHashMap採用鎖分段技術。先將數據分紅一段一段地存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個數據的時候,其餘段的數據也能被其餘線程訪問。算法

 

  ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖(ReentrantLock),在ConcurrencyHashMap中扮演鎖的角色;HashEntry則用於存儲鍵值對數據。一個ConcurrentHashMap裏包含一個Segment數組。Segment是一種數組和鏈表結構。一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素,每一個Segment守護着一個HasnEntry數組裏的元素,黨對HashEntry數組的數據進行修改時,必須先獲取與它對應的Segment的鎖。數組

  ConcurrentHashMap的初始化方法是經過initialCapacity,loadFactor和concurrencyLevel等幾個參數來初始化segment數組,段偏移量segmentShifr,段掩碼segmentMask和每一個segment裏的HashEntry數組來實現的。緩存

if(concurrencyLevel > MAX_SEGMENTS)
  concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;
while(ssize < concurrencyLevel){
  ++sshift;
  ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);

   segments數組的長度ssize是經過concurrencyLevel計算得出的。爲了能經過按位與的散列算法來定位segments數組的索引,必須保證segments數組的長度是2的N次方(power-of-two size),因此必須計算出一個大於或等於concurrencyLevel的最小的2的N次方值來做爲segments數組的長度。concurrencyLevel的最大值是65535,這意味着segments數組的長度最大爲65536,對應的二進制是16位。安全

  segmentShift和segmentMask須要在定位segment時的散列算法裏使用。sshift等於ssize從1向左移位的次數,在默認狀況下concurrencyLevel等於16,1須要向左移動4次,因此sshift爲4。segmentShift用於定位參與散列運算的位數,segmentShift等於32(ConcurrentHashMao的hash*(輸出的最大數是32位)減sshift,即2。segmentMask是散列運算的掩碼,等於ssize減1,即15。掩碼的二進制各個位的值都是1.由於ssize的最大長度是65536,因此segmentShift最大值是16,segmentMask最大值是65535,對應的二進制是16位,每一個位都是1。數據結構

  初始化每一個segment多線程

    輸入參數intialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每一個segment的負載因子,在構造方法裏須要經過這兩個參數來初始化數組中的每一個segment。app

if(initialCapacity > MAXIMUM_CAPACITY){
  initialCapacity = MAXIMUM_CAPACITY;
}
int c = initialCapacity / ssize;
if(c * ssize < initialCapacity)
  ++c;
int cap = 1; //segment中的HashEntry數組的長度
while(cap < c)
  cap <<= 1;
for(int i = 0; i < this.segments.length; ++i)
  this.segments[i] = new Segment<K, V>(cap, loadFactor); //segment的容量threshold=(int)cap*loadFactor。默認狀況下,initialCapacity是16,loadFactor是0.75,cap爲1,threshod爲0

  定位Segment框架

    ConcurrentHashMap會使用Wang/Jenkins hash的變種算法對元素的hashCode進行一次再散列。再進行散列的目的是減小散列衝突,使元素可以均勻地分佈在不一樣的Segment上,從而提升容器的存取效率、

private static int has(int h){
  h += (h << 15) ^ 0xffffcd7d;
  h ^= (h >>> 10);
  h += (h << 3);
  h ^= (h >>> 6);
  h += (h << 2) + (h << 14);
  return h ^ (h >>> 16);
}

    ConcurrentHashMao經過散列算法定位segment。默認狀況下,segmentShift爲28,segmentMask爲15,在散列後的數最大是32位的二進制數據,向右無符號移動28位(讓高4位參加到散列運算中),

final Segment<K, V> segmentFor(int hash){
  return [segments(hash >>> segmentShift) & segmentMask];
}  

  ConcurrentHashMap的操做

    get():先通過一次散列,而後在使用這個散列值經過算咧運算定位到Segment,再經過散列算法定位到元素。get整個過程都不須要加鎖,除非讀到的值是空纔會加鎖重讀。get方法裏將要使用的共享變量都定義成了volatile類型。由於Java內存模型的happen before原則,對volatile字段的寫入操做會優先於讀操做,即便兩個線程同時修改和獲取volatile變量,get操做也能夠拿到最新值。定位HashEntry和定位的Segment的散列算法雖然都是與數組的長度減去1再相"與",定位Segment使用的是元素的hashcode經過再散列後獲得的值的高位,而定位HashEntry直接使用過的是散列後的值。

public V get(Object key){
  int hash = hash(key.hashCode());
  return segmentFor(hash).get(key, hash);
}

    put():put須要對共享變量進行寫入操做,因此在操做共享變量是必須加鎖。put先定位到Segment,而後在Segment裏面進行插入操做。插入操做經歷兩個步驟,第一步判斷是否須要對Segment裏的HashEntry數組進行擴容,第二部定位添加元素的位置,而後將其放在HashEntry數組裏面。

     是否須要擴容:在插入元素前會先判斷Segment裏的HashEntry數組是否超過容量(threshold),若超過閾值,則對數組進行擴容。Segment的擴容判斷比HashMap更恰當,若HashMap在插入元素後判斷元素已經達到容量後再進行擴容,以後若沒有新元素插入,則HashMap就進行了無效的擴容

      如何擴容:在擴容時,先建立一個容量是原來容量二倍的數組,而後將原數組裏的元素進行再散列後插入到新的數組中,ConcurrentHashMap只會對某個segment進行擴容。

    size():統計整個ConcurrentHashMap裏元素的大小。ConcuurentHashMap先嚐試2次經過不鎖住Segment的方式來統計各個Segment大小。若統計過程當中容器的count發生了變化(使用modCount變量,在put,remove和clean方法元素操做前將變量modCount加1,在統計size先後比較modCount是否發生變化),則再採用加鎖的方式來統計全部Segment的大小。

 

  實現一個線程安全的隊列有兩種方式:一種是使用阻塞算法,另外一種是使用非阻塞算法。使用阻塞算法的隊列能夠用一個鎖(入隊出隊同一把鎖)或兩個鎖(入隊和出隊用不一樣的鎖)等方式起來實現。非阻塞的實現方式則可使用循環CAS的方式來實現

 

ConcurrentLinkedQueue

  ConcurrentLinkedQueue是一個基於連接節點的無界線程安全隊列。它採用先進先出的規則對節點進行排序,當咱們添加一個元素的時候,它會添加到隊列的尾部;當咱們獲取一個元素時,它會返回隊列頭部的元素。它採用了「wait-free「算法來實現。ConcurrentLinkedQueue由head節點和tail節點組成,每一個節點(Node)由節點元素(item)和指向下一個節點(next)的引用組成,該節點與節點之間就是經過next關聯起來,從而組成一張鏈表結構的隊列。默認狀況下head節點存儲的元素爲空,tail節點等於head節點。

  入隊列

    入隊列就是將入隊節點添加到隊列的尾部。入隊主要作兩件事:第一是將入隊節點設置成當前隊列尾節點的下一個節點;第二是更新tail節點,若tail節點的next節點不爲空,則將入隊節點設置成tail節點,若tail節點的next節點爲空,則將入隊節點設置城tail的next節點,因此tail節點不老是尾節點。

 

public boolean offer(E e){
  if(e == null)
    throw new NullPointerException();
  Node<E> n = new Node<E>(e);
  retry:
  // 入隊不成功反覆入隊
for(;;){ Node<E> t = tail; Node<E> p = t; for(int hops = 0; ; hops++){
     // 獲取p的下一個節點 Node
<E> next = succ(p); if(next != null){ if(hops > HOPS && t != tail) continue retry; p = next; }else if(p.casNext(null, n)){
      //若tail節點有大於等於1個next節點,則將入隊節點設置成tail節點。 if(hops >= HOPS)
casTail(t, n);
return true; //永遠返回true,不要經過返回值判斷入隊是否成功 }else{ p = succ(p); } } } }

  定位尾節點

    tail節點並不是老是尾節點。尾節點多是tail節點,也多是tail節點的next節點。

final Node<E> succ(Node<E> p){
  Node<E> next = p.getNext();
  return (p==next) head : next;
}

  設置入隊節點爲尾節點

    p.casNext(null, n)方法將入隊節點設置爲當前隊尾節點的next節點,若p是null則表示p是當前隊列的尾節點,若不爲null,則表示有其餘線程更新了尾節點,須要從新獲取當前隊列的尾節點

  HOPS

    doug lea使用hops變量來控制並減小tail節點的更新頻率,並非每次節點入隊後都將tail節點更新爲尾節點,而是當tail節點和尾節點的距離大於等於常量HOPS的值時才更新tail節點,tail和尾節點的距離越長,使用CAS更新tail節點的次數就會越少,但每次入隊時定位尾節點的時間久越長

 

  出隊列

    出隊列就是從隊列裏返回一個節點元素,並清空該節點對元素的引用。當head節點裏有元素時,直接彈出head節點裏的元素,而不會更新head節點。當head節點沒有元素時,出隊操做纔會更新head節點。

public E poll(){
  Node<E> h = head;
  Node<E> p = h;
  for(int hops = 0; ; hops++){
// 獲取p節點元素 E item
= p.getItem();
// 若p節點的元素不爲空,使用CAS設置p幾點引用的元素爲null,成功則返回p
if(item != null && p.casItem(item, null)){ if(hops >= HOPS){ Node<E> q = p.getNext(); updateHead(h, (q != null) q : p); } return item; }
//若頭節點的元素爲空或頭節點發生了變化,則頭節點已被另外一個線程改了,此時選取p的下一個節點 Node
<E> next = succ(p);
// 若p的下一個節點也空,則說明隊列已空
if(next == null){ updateHead(h, p);
break; }
  // 若一個元素不爲空,則將頭節點的下一個節點設置成頭節點 p
= next; } return null; }

 

阻塞隊列

  阻塞隊列是一個支持兩個附件操做的隊列。這兩個附加的操做支持阻塞的插入和移除方法。

  支持阻塞的插入方法:當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿時

  支持阻塞的移除方法:在隊列爲空時,獲取元素的線程會等待隊列變爲非空

  阻塞隊列經常使用於生產者進而消費者的場景,生產者是向隊列裏添加元素的線程,消費者是從隊列裏取元素的線程。阻塞隊列就是生產者用來存放元素,消費者用來獲取元素的容器。

  插入和移除操做的4種處理方式

方法/處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove() poll() take() poll(time, unit)
檢查方法 element() peek() 不可用 不可用

 

 

 

 

  拋出異常:當隊列滿時,若再往隊列裏插入元素,會拋出IllegalStateException("Queuefull")異常。當隊列空時,從隊列裏獲取元素會拋出NoSuchElementException異常。

  返回特殊值:當往隊列插入元素時,會返回元素是否插入成功,成功返回true。若移除方法,則是從隊列裏取出一個元素,不然返回null。

  一直阻塞:當阻塞隊列滿時,若生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到隊列可用或響應中斷退出。當隊列空時,若消費者線程從隊列裏take元素,隊列會阻塞住消費者線程,直到隊列不爲空。

  超時退出:當阻塞隊列滿時,若生產者線程往隊列裏插入元素,隊列會阻塞生產者線程一段時間。若超過指定時間,生產者線程就會退出。

  

JDK提供7個阻塞隊列

  ArrayBlockingQueue:由數組結構組成的有界阻塞隊列

  LinkedBlockingQueue:由鏈表結構組成的有界阻塞隊列

  PriorityBlockingQueue:支持優先級排序的無界阻塞隊列

  DelayQueue:使用優先級隊列實現的無界阻塞隊列

  SychronousQueue:不存儲元素的阻塞隊列

  LinkedTransferQueue:鏈表結構組成的無界阻塞隊列

  LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列

 

  公平訪問隊列是指阻塞的線程,能夠按照阻塞的前後順序訪問隊列,即先阻塞線程先訪問隊列。非公平性是對等待的線程是非公平的,當隊列可用時,阻塞的線程均可以爭奪訪問隊列的資格,有可能縣阻塞的線程最後才訪問隊列。

 

ArrayBlockingQueue

  ArrayBlockingQueue按照先進先出的原則對元素進行排序。默認狀況下不保證線程公平的訪問隊列

public ArrayBlockingQueue(int capacity, boolean fair){
  if(capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull = lock.newCondition();
}

 

LinkedBlockingQueue

  LinkeBlockingQueue的默認的和最大長度爲Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。

 

PriorityBlockingQueue    

  PriorityBlockingQueue是一個支持優先級的無界阻塞隊列。默認狀況下元素採起天然順序升序排序。也能夠自定義類實現compareTo()方法來指定元素排序規則,或初始化PriorityBlockingQueue時指定構造參數Comparator來對元素進行排序。不能保證同優先級元素的順序。

 

DelayQueue  

  DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從對了了中提取元素。

  應用場景以下:

    緩存系統的設計:可使用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了

    定時任務調度:使用DelayQueue保存當前將會執行的任務和執行時間,一旦從DelayQueue中獲取到到任務就開始執行。

  實現Delayed接口:

    在建立對象時,初始化基本數據。使用time記錄當前對象延遲到何時可使用,使用sequenceNumber來標識元素在隊列中的前後順序。

private static final AtomicLong sequencer = new AtomicLong(0);
ScheduledFutureTask(Runnable r, V result, long ns, long period){
  super(r, result);
  this.time = ns;
  this.period = period;
  this.sequenceNumber = sequencer.getAndIncrement();
}

    實現getDelay方法。該方法返回當前元素還須要延時多長時間,單位是納秒。

public void getDelay(TimeUnit unit){
  return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}

    實現compareTo方法來指定元素的順序。

public int compareTo(Delayed other){
  if(other == this)
    return 0;
  if(other instanceof ScheduledFutureTask){
    ScheduledFutureTask<> x = (ScheduledFutureTask<>) other;
    long diff = time - x.time;
    if(diff < 0)
      return -1;
    else if(diff > 0)
      return 1;
    else if(sequenceNumber < x.sequenceNumber)
      return -1;
    else 
      return 1;
  }
  long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
  return (d == 0) 0 : ((d < 0) -1 : 1);
}

  延時阻塞隊列的實現

    當消費者從隊列裏獲取元素時,若元素沒有達到延時時間,就阻塞當前線程

long delay = first.getDelay(TimeUnit.NANOSECONDS);
if(delay <= 0)
  return q.poll();
else if(leader != null)  // leader是一個等待獲取隊列頭部元素的線程。若leader不等於空,表示已經有線程在等待獲取隊列的頭元素
  available.await();
else{
  Thread thisThread = Thread.currentThread();
  leader = thisThread;
  try{
    available.awaitNanos(delay);
  }finally{
    if(leader == thisThread)
      leader = null;
  }
}

 

SynchronousQueue

  SynchronousQueue的每個put操做必須等待一個take操做,不然不能繼續添加元素。它支持公平訪問隊列。默認狀況下線程採用非公平性策略訪問隊列

public SynchronousQueue(boolean fair){
  transferer = fair ? new TransferQueue() : new TransferStack();
}

  SynchronousQueue負責把生產者線程處理的數據直接傳遞給消費者線程。隊列自己並不存儲任何元素。

  

LinkedTransferQueue  

  transfer():當前有消費者正在等待接收元素(消費者使用take()或帶有時間限制的poll()),transfer方法能夠把生產者傳入的元素馬上transfer給消費者。若沒有消費者在等待接收元素,transfer方法將元素存放在隊列的tail節點,並等待該元素被消費者消費了才返回。

Node pre = tryAppend(s, haveData); //將當前節點做爲tail節點
return awaitMatch(s, pred, e, (how == TIMED), nanos);  //讓CPU自旋等待消費者消費元素

  tryTransfer():用來試探生產者傳入的元素是否能直接傳給消費者。若沒有消費者接受元素,則返回false。tryTransfer方法不管消費者是否接受都馬上返回,而transfer方法必須等到消費者消費後才返回。tryTransfer(E e, long timeout, TimeUnit unit)方法試圖把生產者傳入的元素直接傳給消費者,若沒有消費者消費該元素則等待指定的時間再返回,若超時還沒消費元素,則返回false,若在超時時間內消費了元素,則返回true。

 

LinkedBlockingDeque

  LinkedBlockingDeque是由鏈表組成的雙向阻塞隊列。雙向隊列指的是能夠從隊列的兩端插入和移除元素。雙向隊列由於多了一個操做對了的入口,在多線程同時入隊時,減小了一半的競爭。相對於其餘阻塞隊列,LinkedBlockingDeque多了addFirst,addList,offerFirst,offerLast,peekFirst和peekLast等方法。

 

阻塞隊列的實現原理

  使用通知模式的實現

    通知模式就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個對了中的元素後,會通知生產者當前隊列可用。

private final Condition notFull;
private finla Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair){
  notEmpty = lock.newCondition();
  notFull = lock.newCondition();
}

public void put(E e) throws InterruptedException{ 
  checkNotFull(e);
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try{
    while(count == items.length)
      notFull.await();
    insert(e);
  }finally{
    lock.unlock();
  }
} 

public E take() throws InterruptedException(){
  final ReentrantLock lock = this.lock;
  lock.lokcInterruptibly();
  try{
    while(count == 0)
      notEmpty.await();
    return extract();
  }finally{
    lock.unlock();
  }
}

private void insert(E x){
  items[putIndex] = x;
  putIndex = inc(putIndex);
  ++count;
  notEmpty.signal();
}

  當往隊列裏插入一個元素時,若對了不可用,那麼阻塞生產者主要經過LockSupport.park(this)來實現

pubilc final void await() throws InterruptedException{
  if(Thread.interrupted())
    throw new InterruptedException();
  Node node = addConditionWaiter();
  int savedState = fullRelease(node);
  int interruptMode = 0;
  while(!isOnSyncQueue(node)){
    LockSupport.park(this);
    if((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
  }
  if(acquireQueued(node, savedState) && interrutMode != THROW_IE)
    interruptMode = REINTERRUPT;
  if(node.nextWaiter != null)
    unlinkCancelledWaiters();
  if(interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
}
public static void park(Object blocker){
  Thread t = Thread.currentThread();
  setBlocker(t, blocker);
  unsafe.park(false, 0L);
  setBlocker(t, null);
}

  park會阻塞當前線程,只有一下4種狀況中的一種發生時,該方法纔會返回:

    與park對應的unpark執行或已經執行時。「已經執行」是指unpark先執行,而後在執行park的狀況

    線程被中斷時

    等待完time參數指定的毫秒數時

    異常現象發生時

  JVM在linux下實現park的方式是使用系統方法pthread_cond_wait。pthread_cond_wait是一個多線程的條件變量函數。這個方法接受兩個參數:一個共享變量_cond,一個互斥量_mutex,unpark是使用pthread_cond_signal實現的。

 

Fork/Join

  Fork/Join是一個把大人物分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。

  工做竊取(work-stealing)算法

    工做竊取算法是指某個線程從其餘隊列裏竊取任務來執行。

    當把一個當任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,把這些子任務分別放到不一樣的隊列裏,並未每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應。有的線程會先幹完本身的任務,此時其餘線程對應的隊列裏還有任務等待處理。因而該線程就去其餘線程的隊列裏竊取一個任務來作。此時他們會訪問同一個隊列,爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從隊列的頭部拿任務執行,竊取任務的線程從雙端隊列的尾部任務執行。

    優勢:充分利用線程進行並行計算,減小了線程間的競爭

    缺點:在雙端隊列裏只有一個任務時會存在競爭,而且該算法會消耗了更多的系統資源。

  設計

    分割任務:須要一個fork類來把大任務分割成子任務,須要不停地分割直到分割出的子任務足夠小。

    執行任務併合並結果:分割的子任務分別放在雙端隊列裏,而後幾個啓動線程分別從雙端隊列裏獲取任務執行。子任務執行完的結果都統一放在一個隊列裏,啓動一個線程從隊列裏拿數據,而後合併這些數據

  Fork/Join使用兩個類來完成上述設計:

    ForkJoinTask:須要使用ForkJoin框架,必須先建立一個ForkJoin任務。它提供在任務中執行fork()和join()操做的機制。一般狀況下,咱們不須要直接繼承ForkJoinTask,只須要繼承它的子類,Fork/Join只提供了兩個子類:

      RecursiveAction:用於沒有返回結果的任務

      RecursiveTask:用於有返回結果的任務

    ForkJoinPool:ForkJoinTask須要經過ForkJoinPool來執行

      任務分割出的子任務會添加到當前工做線程所維護的雙端隊列中,進入隊列的頭部。當一個工做線程的隊列裏暫時沒有任務時,它會隨機從其餘線程的隊列的尾部獲取一個任務。

public class CountTask extends RecursiveTask<Integer>{
    
    private static final int THRESHOLD = 2;
    private int start;
    private int end;
    
    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRESHOLD;
        if(canCompute){
            for(int i = start; i <= end; i++)
                sum += i;
        }else{
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            leftTask.fork();  // 每一個子任務調用fork方法時又會進入compute方法,看當前子任務是否有必要分割成子任務,若不須要則執行當前任務,不然繼續分割。
            rightTask.fork();
            int leftResult = leftTask.join(); 
            int rightResult = rightTask.join();
            sum = leftResult + rightResult;
        }
        return sum;
    }
    
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(1, 4);
        Future<Integer> result = forkJoinPool.submit(task);
        try {
            System.out.println(result.get());
        } catch (Exception e) {
            
        }
    }

}

    ForkJoinTask在執行的時候可能會拋出異常,可是沒辦法在主線程中捕獲異常。ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已被取消。而且能夠經過ForkJoinTask的getException方法獲取異常。getException方法返回Throwable對象,若任務被取消了則返回CancellationException。若任務沒有完成或被拋出,則返回null。

 

  ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責將程序提交給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務。

    ForkJoinTask的fork方法實現原理

      當咱們調用ForkJoinTask的fork方法時,程序會調用ForkJoinWorkerThread的pushTask方法異步地執行這個任務,而後當即返回結果。

public final ForkJoinTask<V> fork(){
  ((ForkJoinWorkerThread) Thread.currentThread()).pushTask(this);
  return this;
}

    push方法把當前任務存放在ForkJoinTask數組隊列裏。而後再調用ForkJoinPool的signalWork()方法喚醒或建立一個工做線程來執行。

final void pushTask(ForkJoinTask<> t){
  ForkJoinTask<>[] q;
  int s, m;
  if((q = queue) != null){
    long u = (((s = queueTop) & (m = q.length -1)) << ASHIFT) + ABASE;
    UNSAFE.putOrderedObject(q, u, t);
    queueTop = s + 1;
    if((s -= queueBase) <= 2)
      pool.signalWork();
    else if(s == m)
      growQueue(); 
  }
}

    ForkJoinTask的join方法實現原理

      Join方法的主要做用是阻塞當前線程並等待獲取結果。

public final V join(){
  jf(doJoin() != NORMAL)
    return reportResult();
  else
    return getRawResult();
}

private V reportResult(){
  int s;
  Throwable ex;
  if((s = status) == CANCELLED)
    throw new CancellationException();
  if(s == EXCEPTIONAL && (ex = getThrowableException()) != null)
    UNSAFE.throwException(ex);
  return getRawResult();
}

  調用doJoin()方法獲得當前任務的狀態來判斷返回什麼結果。任務狀態由4種:已完成(NORMAL),被取消(CANCELLED),信號(SIGNAL)和出現異常(EXCEPTIONAL)。

    若任務狀態是已完成,則直接返回任務結果

    若任務狀態是被取消,則直接拋出CancellationException

         若任務狀態是拋出異常,則直接拋出對應的異常

    

private int doJoin(){
  Thread t;
  ForkJoinWorkerThread w;
  int s;
  boolean completed;
  if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread){
    if((s = status) < 0) //查看狀態,若任務完成,則直接返回任務狀態
      return s;    if((w = (ForkJoinWorkerThread)t).unpushTask(this)){
      try{
        completed = exec();
      }catch(Throwable ex){
        return setExceptionalCompletion(rex);
      }
      if(completed)
        return setCompletion(NORMAL);
    }
    return w.joinTask(this);
  }else
    return externalAwaitDone();
}
相關文章
相關標籤/搜索