AQS源碼解讀 二

前面已經講解了AQS源碼的獨享模式,今天來講一下AQS的共享模式

 

下面以CountDownLatch去講解AQS的共享模式

 

首先講下什麼是CountDownLatch,CountDownLatch所描述的是」在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待「。在API中是這麼說的:

用給定的計數 初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之後,會釋放所有等待的線程,await 的所有後續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。

先看CountDownLatch的例子

 

public  static  void  main(String[] args) {
     final  CountDownLatch latch =  new  CountDownLatch( 2 );
  
     new  Thread(){
         public  void  run() {
             try  {
                 System.out.println( "線程1執行" );
                 Thread.sleep( 5000 );
                 latch.countDown();
             catch  (InterruptedException e) {
                 e.printStackTrace();
             }
         };
     }.start();
  
     new  Thread(){
         public  void  run() {
             try  {
                 System.out.println( "線程2執行" );
                 Thread.sleep( 3000 );
                 latch.countDown();
             catch  (InterruptedException e) {
                 e.printStackTrace();
             }
         };
     }.start();
  
     new  Thread(){
         public  void  run() {
             try  {
                 System.out.println( "線程3阻塞" );
                 latch.await();
                 System.out.println( "線程3繼續執行" );
             catch  (InterruptedException e) {
                 e.printStackTrace();
             }
         };
     }.start();
  
     try  {
         Thread.sleep( 1000 );
         System.out.println( "主線程線程阻塞" );
         latch.await();
     catch  (InterruptedException e) {
         e.printStackTrace();
     }
     System.out.println( "主線程繼續執行" );
  
}

 

線程3 和主線程會加入到隊列中

 

 node1會判斷前序節點是否是頭結點,如果是前序節點是頭節點 但是計數器不爲0 則阻塞自己 並將waitstatus狀態改爲-1 即SIGNAL

 node2 會判斷當前節點是否爲頭結點,前序節點不是頭結點 直接阻塞自己 並將waitstatus狀態改爲-1

 

如果計數器爲零,就會把node1給喚醒,喚醒後 node1將自己的節點設置爲頭結點 並將節點waitstatus狀態設置爲 -3 PROPAGATE

然後繼續執行for循環 這時候node2的前序節點是頭結點,然後繼續將節點node2設置爲頭結點,並將節點waitstatus狀態設置爲-3 即PROPAGATE

 

 

 

接着看CountDownLatch的源碼

public  class  CountDownLatch {
     /**
  * Synchronization control For CountDownLatch.
  * Uses AQS state to represent count.
  */
  private  static  final  class  Sync  extends  AbstractQueuedSynchronizer {
         private  static  final  long  serialVersionUID = 4982264981922014374L;
 
         Sync( int  count) {
             setState(count);
         }
 
         int  getCount() {
             return  getState();
         }
 
         protected  int  tryAcquireShared( int  acquires) {
             return  (getState() ==  0 ) ?  1  : - 1 ;
         }
 
         protected  boolean  tryReleaseShared( int  releases) {
             // Decrement count; signal when transition to zero
  for  (;;) {
                 int  c = getState();
                 if  (c ==  0 )
                     return  false ;
                 int  nextc = c- 1 ;
                 if  (compareAndSetState(c, nextc))
                     return  nextc ==  0 ;
             }
         }
     }
 
     private  final  Sync sync;
 
//構造一個用給定計數初始化的 CountDownLatch
  public  CountDownLatch( int  count) {
         if  (count <  0 throw  new  IllegalArgumentException( "count < 0" );
         this .sync =  new  Sync(count);
     }
}
 
 
 
 
public  void  await()  throws  InterruptedException {
     sync.acquireSharedInterruptibly( 1 );
}
 
 
public  boolean  await( long  timeout, TimeUnit unit)
     throws  InterruptedException {
     return  sync.tryAcquireSharedNanos( 1 , unit.toNanos(timeout));
}
 
 
public  void  countDown() {
     sync.releaseShared( 1 );
}
}
 

 

可以看出CountDownLatch內部依賴Sync實現,

Sync繼承AQS。CountDownLatch僅提供了一個構造方法:

CountDownLatch(int count) : 構造一個用給定計數初始化的 CountDownLatch 設置count

 public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
}
Sync(int count) {
    setState(count);
}

設置state是count

 

看countDown方法

public  void  countDown() {
     sync.releaseShared( 1 );
}
public  final  boolean  releaseShared( int  arg) {
     if  (tryReleaseShared(arg)) { //如果此線程是被等待線程裏最後一個被釋放的線程 就去通知同步等待隊列裏的節點
         doReleaseShared();
         return  true ;
     }
     return  false ;
}

 

再看tryReleaseShared方法 

protected  boolean  tryReleaseShared( int  releases) {
         // Decrement count; signal when transition to zero
  for  (;;) {
             int  c = getState(); //獲取計數器的值
             if  (c ==  0 )
                 return  false ;
             int  nextc = c- 1 ; //每個被等待的線程執行完計數器減1
             if  (compareAndSetState(c, nextc)) //設置計數器的新值
                 return  nextc ==  0 ; //如果計數器爲0 返回true
         }
     }
}

 

 

再看doReleaseShared方法

private  void  doReleaseShared() {
     /*
  * Ensure that a release propagates, even if there are other
  * in-progress acquires/releases. This proceeds in the usual
  * way of trying to unparkSuccessor of head if it needs
  * signal. But if it does not, status is set to PROPAGATE to
  * ensure that upon release, propagation continues.
  * Additionally, we must loop in case a new node is added
  * while we are doing this. Also, unlike other uses of
  * unparkSuccessor, we need to know if CAS to reset status
  * fails, if so rechecking.
  */
  for  (;;) {
         Node h = head;
         if  (h !=  null  && h != tail) {
             int  ws = h.waitStatus; //如果頭結點是-1 (可以看下面wait方法有講解,已經把頭結點設置爲-1了 所以會走
//f (ws == Node.SIGNAL) 這一步
 
             if  (ws == Node.SIGNAL) {
                 if  (!compareAndSetWaitStatus(h, Node.SIGNAL,  0 )) //把頭結點再設置爲0 不成功自旋操作,直到設置成功
                     continue ;        i     // loop to recheck cases
  unparkSuccessor(h); //喚醒節點
             }
             else  if  (ws ==  0  &&
                      !compareAndSetWaitStatus(h,  0 , Node.PROPAGATE))
                 continue ;                 // loop on failed CAS
  }
         if  (h == head)                    // loop if head changed
  break ;
     }
}

 

 
再看unparkSuccessor方法
private  void  unparkSuccessor(Node node) {
     /*
  * If status is negative (i.e., possibly needing signal) try
  * to clear in anticipation of signalling. It is OK if this
  * fails or if status is changed by waiting thread.
  */
  int  ws = node.waitStatus; //因爲頭結點已經設置爲0了,所以ws<0不滿足
     if  (ws <  0 )
         compareAndSetWaitStatus(node, ws,  0 );
 
     /*
  * Thread to unpark is held in successor, which is normally
  * just the next node. But if cancelled or apparently null,
  * traverse backwards from tail to find the actual
  * non-cancelled successor.
  */
  Node s = node.next;
     if  (s ==  null  || s.waitStatus >  0 ) { //這一步也不滿足,可以看下面wait方法裏有講解 頭結點的後續節點的status都是-1
//所以這一步不滿足 直接走LockSupport.unpark(s.thread);喚醒頭結點的下一個節點
         s =  null ; //如果waitstatus>0說明 節點取消了 就找下一個waitstatus是-1的節點 並喚醒
         for  (Node t = tail; t !=  null  && t != node; t = t.prev)
             if  (t.waitStatus <=  0 )
                 s = t;
     }
     if  (s !=  null )
         LockSupport.unpark(s.thread);
}

 

再看wait方法

public  final  void  acquireSharedInterruptibly( int  arg)
         throws  InterruptedException {
     if  (Thread.interrupted())
         throw  new  InterruptedException();
     if  (tryAcquireShared(arg) <  0 ) //嘗試獲取鎖,獲取失敗就執行下面的方法
         doAcquireSharedInterruptibly(arg);
}

 

 

看tryAcquireShared方法

protected  int  tryAcquireShared( int  acquires) {
     return  (getState() ==  0 ) ?  1  : - 1 ;
}

 

如果state是0,說明被等待的線程全都執行完了 。return -1說明沒有執行完 

再看doAcquireSharedInterruptibly方法

private  void  doAcquireSharedInterruptibly( int  arg)
     throws  InterruptedException {
     final  Node node = addWaiter(Node.SHARED); //如果隊列是空的,就新建一個頭節點,頭節點指向尾節點,
    //然後再新建一個節點放在頭節點後面 如果隊列不爲空,就在尾節點後面新建一個節點。節點是shared類型的
//隊列節點的waitStatus默認是0 因爲上篇AQS源碼一種有講解,就不講那麼多了
     boolean  failed =  true ;
     try  {
         for  (;;) { //開啓自旋
             final  Node p = node.predecessor();
             if  (p == head) { //如果新建節點的前序節點是頭節點,而且state的值爲0 就走到setHeadAndPropagate方法
 
                 int  r = tryAcquireShared(arg);
                 if  (r >=  0 ) { //如果被等待的線程執行完了
 
                     setHeadAndPropagate(node, r); //把當前節點設置爲頭節點,而且喚醒後續掛起的節點
                     p.next =  null // help GC
  failed =  false ;
                     return                      p.next =  null // help GC
  failed =  false ;
                     return ;
                 }
             }
             if  (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt()) //如果當前節點的前序節點不是頭節點或者計數器不等於0,就阻塞當前節點
                 throw  throw  new  InterruptedException();
         }
     finally  {
         if  (failed)
             cancelAcquire(node);
     }
}

 

再看shouldParkAfterFailedAcquire方法
finally 
相關文章
相關標籤/搜索