JUC回顧之-AQS同步器的實現原理

1.什麼是AQS?html

     AQS的核心思想是基於volatile int state這樣的volatile變量,配合Unsafe工具對其原子性的操做來實現對當前鎖狀態進行修改。同步器內部依賴一個FIFO的雙向隊列來完成資源獲取線程的排隊工做。java

2.同步器的應用node

 同步器主要使用方式是繼承,子類經過繼承同步器並實現它的抽象方法來管理同步狀態,對同步狀態的修改或者訪問主要經過同步器提供的3個方法:安全

  • getState() 獲取當前的同步狀態
  • setState(int newState) 設置當前同步狀態
  • compareAndSetState(int expect,int update) 使用CAS設置當前狀態,該方法可以保證狀態設置的原子性。

     同步器能夠支持獨佔式的獲取同步狀態,也能夠支持共享式的獲取同步狀態,這樣能夠方便實現不一樣類型的同步組件。數據結構

     同步器也是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。架構

3.AQS同步隊列併發

   同步器AQS內部的實現是依賴同步隊列(一個FIFO的雙向隊列,其實就是數據結構雙向鏈表)來完成同步狀態的管理。app

   當前線程獲取同步狀態失敗時,同步器AQS會將當前線程和等待狀態等信息構形成爲一個節點(node)加入到同步隊列,同時會阻塞當前線程;less

   當同步狀態釋放的時候,會把首節點中的線程喚醒,使首節點的線程再次嘗試獲取同步狀態。AQS是獨佔鎖和共享鎖的實現的父類。   ide

 

4.AQS鎖的類別:分爲獨佔鎖和共享鎖兩種。

  • 獨佔鎖:鎖在一個時間點只能被一個線程佔有。根據鎖的獲取機制,又分爲「公平鎖」和「非公平鎖」。等待隊列中按照FIFO的原則獲取鎖,等待時間越長的線程越先獲取到鎖,這就是公平的獲取鎖,即公平鎖。而非公平鎖,線程獲取的鎖的時候,無視等待隊列直接獲取鎖。ReentrantLock和ReentrantReadWriteLock.Writelock是獨佔鎖。
  • 共享鎖:同一個時候可以被多個線程獲取的鎖,能被共享的鎖。JUC包中ReentrantReadWriteLock.ReadLock,CyclicBarrier,CountDownLatch和Semaphore都是共享鎖。

  JUC包中的鎖的包括:Lock接口,ReadWriteLock接口;Condition條件,LockSupport阻塞原語。      

  AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer三個抽象類,

  ReentrantLock獨佔鎖,ReentrantReadWriteLock讀寫鎖。CountDownLatch,CyclicBarrier和Semaphore也是經過AQS來實現的。

  下面是AQS和使用AQS實現的一些鎖,以及經過AQS實現的一些工具類的架構圖:

 

                         圖 1.依賴AQS實現的鎖和工具類

                                                                                

 

5.AQS同步器的結構:同步器擁有首節點(head)和尾節點(tail)。同步隊列的基本結構以下:

 

                                                       圖 1.同步隊列的基本結構 compareAndSetTail(Node expect,Node update)

  • 同步隊列設置尾節點(未獲取到鎖的線程加入同步隊列): 同步器AQS中包含兩個節點類型的引用:一個指向頭結點的引用(head),一個指向尾節點的引用(tail),當一個線程成功的獲取到鎖(同步狀態),其餘線程沒法獲取到鎖,而是被構形成節點(包含當前線程,等待狀態)加入到同步隊列中等待獲取到鎖的線程釋放鎖。這個加入隊列的過程,必需要保證線程安全。不然若是多個線程的環境下,可能形成添加到隊列等待的節點順序錯誤,或者數量不對。所以同步器提供了CAS原子的設置尾節點的方法(保證一個未獲取到同步狀態的線程加入到同步隊列後,下一個未獲取的線程纔可以加入)。  以下圖,設置尾節點:

 圖 2.尾節點的設置  節點加入到同步隊列

  •  同步隊列設置首節點(原頭節點釋放鎖,喚醒後繼節點):同步隊列遵循FIFO,頭節點是獲取鎖(同步狀態)成功的節點,頭節點在釋放同步狀態的時候,會喚醒後繼節點,然後繼節點將會在獲取鎖(同步狀態)成功時候將本身設置爲頭節點。設置頭節點是由獲取鎖(同步狀態)成功的線程來完成的,因爲只有一個線程可以獲取同步狀態,則設置頭節點的方法不須要CAS保證,只須要將頭節點設置成爲原首節點的後繼節點 ,並斷開原頭結點的next引用。以下圖,設置首節點:

圖 3.首節點的設置

 6.獨佔式的鎖的獲取:調用同步器的acquire(int arg)方法能夠獲取同步狀態,該方法對中斷不敏感,即線程獲取同步狀態失敗後進入同步隊列,後續對線程進行中斷操做時,線程不會從同步隊列中移除。

    (1) 當前線程實現經過tryAcquire()方法嘗試獲取鎖,獲取成功的話直接返回,若是嘗試失敗的話,進入等待隊列排隊等待,能夠保證線程安全(CAS)的獲取同步狀態。

    (2) 若是嘗試獲取鎖失敗的話,構造同步節點(獨佔式的Node.EXCLUSIVE),經過addWaiter(Node node,int args)方法,將節點加入到同步隊列的隊列尾部。

    (3) 最後調用acquireQueued(final Node node, int args)方法,使該節點以死循環的方式獲取同步狀態,若是獲取不到,則阻塞節點中的線程。acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點的時候才能嘗試獲取鎖(同步狀態)( p == head && tryAcquire(arg))。

    緣由是:1.頭結點是成功獲取同步狀態的節點,而頭結點的線程釋放鎖之後,將喚醒後繼節點,後繼節點線程被喚醒後要檢查本身的前驅節點是否爲頭結點。

                2.維護同步隊列的FIFO原則,節點進入同步隊列之後,就進入了一個自旋的過程,每一個節點(後者說每一個線程)都在自省的觀察。

 下圖爲節點自旋檢查本身的前驅節點是否爲頭結點:

                              圖 4 節點自旋獲取同步狀態

 

獨佔式的鎖的獲取源碼:

acquire方法源碼以下
/**
     * Acquires in exclusive(互斥) mode, ignoring(忽視) interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued(排隊), possibly
     * repeatedly(反覆的) blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed(傳達) to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     *        
     *  獨佔式的獲取同步狀態      
     *        
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }


 

  嘗試獲取鎖:tryAcquire方法:若是獲取到了鎖,tryAcquire返回true,反之,返回false。

//方法2:
    protected final boolean tryAcquire(int acquires) {
        // 獲取當前線程
        final Thread current = Thread.currentThread();
        // 獲取「獨佔鎖」的狀態,獲取父類AQS的標誌位
        int c = getState();
        //c == 0 意思是鎖(同步狀態)沒有被任何線程所獲取
        //1.當前線程是不是同步隊列中頭結點Node,若是是的話,則獲取該鎖,設置鎖的狀態,並設置鎖的擁有者爲當前線程
        if (c == 0) {
            if (!hasQueuedPredecessors() &&

// 修改下狀態爲,這裏的acquires的值是1,是寫死的調用子類的lock的方法的時候傳進來的,若是c == 0,compareAndSetState操做會更新成功爲1. compareAndSetState(0, acquires)) {
// 上面CAS操做更新成功爲1,表示當前線程獲取到了鎖,由於將當前線程設置爲AQS的一個變量中,表明這個線程拿走了鎖。 setExclusiveOwnerThread(current);
return true; } } //2.若是c不爲0,即狀態不爲0,表示鎖已經被拿走。
//由於ReetrantLock是可重入鎖,是能夠重複lock和unlock的,因此這裏還要判斷一次,獲取鎖的線程是否爲當前請求鎖的線程。 else if (current == getExclusiveOwnerThread()) {
//若是是,state繼續加1,這裏nextc的結果就會 > 1,這個判斷表示獲取到的鎖的線程,還能夠再獲取鎖,這裏就是說的可重入的意思
int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

addWaiter方法的源碼:回到aquire方法,若是嘗試獲取同步狀態(鎖)失敗的話,則構造同步節點(獨佔式的Node.EXCLUSIVE),
經過addWaiter(Node node,int args)方法
將該節點加入到同步隊列的隊尾。
 
/**
    * Creates and enqueues node for current thread and given mode.
    *
    * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
    * @return the new node
    * 
    * 
    * 若是嘗試獲取同步狀態失敗的話,則構造同步節點(獨佔式的Node.EXCLUSIVE),經過addWaiter(Node node,int args)方法將該節點加入到同步隊列的隊尾。
    * 
    */
    private Node addWaiter(Node mode) {
// 用當前線程夠着一個Node對象,mode是一個表示Node類型的字段,或者說是這個節點是獨佔的仍是共享的,或者說AQS的這個隊列中,哪些節點是獨佔的,哪些節點是共享的。 Node node
= new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;
//隊列不爲空的時候
if (pred != null) { node.prev = pred; // 確保節點可以被線程安全的添加,使用CAS方法
// 嘗試修改成節點爲最新的節點,若是修改失敗,意味着有併發,這個時候進入enq中的死循環,進行「自旋」的方式修改 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } }
//進入自旋 enq(node);
return node; }




enq方法的源碼:同步器經過死循環的方式來保證節點的正確添加,在「死循環」 中經過CAS將節點設置成爲尾節點以後,當前線程才能從該方法中返回,不然
當前線程不斷的嘗試設置。

enq方法將併發添加節點的請求經過CAS變得「串行化」了。
/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     * 
     * 同步器經過死循環的方式來保證節點的正確添加,在「死循環」 中經過CAS將節點設置成爲尾節點以後,當前線程才能從該方法中返回,不然當前線程不斷的嘗試設置。
     * enq方法將併發添加節點的請求經過CAS變得「串行化」了。
     * 
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

 

acquireQueued方法:在隊列中的線程獲取鎖的過程:
/**
    * Acquires in exclusive uninterruptible mode for thread already in
    * queue. Used by condition wait methods as well as acquire.
    *
    * @param node the node
    * @param arg the acquire argument
    * @return {@code true} if interrupted while waiting
    * 
    * acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點才能嘗試獲取同步狀態(鎖)( p == head && tryAcquire(arg))
    *     緣由是:1.頭結點是成功獲取同步狀態(鎖)的節點,而頭節點的線程釋放了同步狀態之後,將會喚醒其後繼節點,後繼節點的線程被喚醒後要檢查本身的前驅節點是否爲頭結點。
    *           2.維護同步隊列的FIFO原則,節點進入同步隊列以後,就進入了一個自旋的過程,每一個節點(或者說是每一個線程)都在自省的觀察。
    * 
    */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
//死循環檢查(自旋檢查)當前節點的前驅節點是否爲頭結點,才能獲取鎖
for (;;) {
// 獲取節點的前驅節點
final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//節點中的線程循環的檢查,本身的前驅節點是否爲頭節點
//將當前節點設置爲頭結點,移除以前的頭節點 setHead(node); p.next
= null; // help GC failed = false; return interrupted; }
// 不然檢查前一個節點的狀態,看當前獲取鎖失敗的線程是否要掛起
if (shouldParkAfterFailedAcquire(p, node) &&
//若是須要掛起,藉助JUC包下面的LockSupport類的靜態方法park掛起當前線程,直到被喚醒
parkAndCheckInterrupt()) interrupted = true; } } finally {
//若是有異常
if (failed)
//取消請求,將當前節點從隊列中移除 cancelAcquire(node); } }


 

獨佔式的獲取同步狀態的流程以下: 

圖5 獨佔式的獲取同步狀態的流程

 7.獨佔鎖的釋放:下面直接看源碼:

 

 /* 
1. unlock():unlock()是解鎖函數,它是經過AQS的release()函數來實現的。 * 在這裏,「1」的含義和「獲取鎖的函數acquire(1)的含義」同樣,它是設置「釋放鎖的狀態」的參數。 * 因爲「公平鎖」是可重入的,因此對於同一個線程,每釋放鎖一次,鎖的狀態-1。 unlock()在ReentrantLock.java中實現的,源碼以下:
*/ public void unlock() { sync.release(1); }

 

release()會調用tryRelease方法嘗試釋放當前線程持有的鎖(同步狀態),成功的話喚醒後繼線程,並返回true,不然直接返回false

    /**
    * Releases in exclusive mode.  Implemented by unblocking one or
    * more threads if {@link #tryRelease} returns true.
    * This method can be used to implement method {@link Lock#unlock}.
    *
    * @param arg the release argument.  This value is conveyed to
    *        {@link #tryRelease} but is otherwise uninterpreted and
    *        can represent anything you like.
    * @return the value returned from {@link #tryRelease}
    * 
    * 
    * 
    */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

 

 // tryRelease() 嘗試釋放當前線程的同步狀態(鎖)
  protected final boolean tryRelease(int releases) {
            //c爲釋放後的同步狀態
          int c = getState() - releases;
          //判斷當前釋放鎖的線程是否爲獲取到鎖(同步狀態)的線程,不是拋出異常(非法監視器狀態異常)
          if (Thread.currentThread() != getExclusiveOwnerThread())
              throw new IllegalMonitorStateException();
          boolean free = false;
          //若是鎖(同步狀態)已經被當前線程完全釋放,則設置鎖的持有者爲null,同步狀態(鎖)變的可獲取
          if (c == 0) {
              free = true;
              setExclusiveOwnerThread(null);
          }
          setState(c);
          return free;
      }
      

 釋放鎖成功後,找到AQS的頭結點,並喚醒它便可:

// 4. 喚醒頭結點的後繼節點
     
     private void unparkSuccessor(Node node) {
         //獲取頭結點(線程)的狀態
        int ws = node.waitStatus;
        //若是狀態<0,設置當前線程對應的鎖的狀態爲0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
            
        Node s = node.next;
        
         //解釋:Thread to unpark is held in successor, which is normally just the next node. 
         //But if cancelled or apparently(顯然) null, traverse backwards(向後遍歷) from tail to find the actual(實際的) non-cancelled successor(前繼節點).
 //從隊列尾部開始往前去找最前面的一個waitStatus小於0的節點。         if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //喚醒後繼節點對應的線程
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

 上面說的是ReentrantLock的公平鎖獲取和釋放的AQS的源碼,惟獨還剩下一個非公平鎖NonfairSync沒說,其實,它和公平鎖的惟一區別就是獲取鎖的方式不一樣,公平鎖是按先後順序一次獲取鎖,非公平鎖是搶佔式的獲取鎖,那ReentrantLock中的非公平鎖NonfairSync是怎麼實現的呢?

 /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

非公平鎖的lock的時候多了上面加粗的代碼:在lock的時候先直接用cas判斷state變量是否爲0(嘗試獲取鎖),成功的話更新成1,表示當前線程獲取到了鎖,不須要在排隊,從而直接搶佔的目的。而對於公平鎖的lock方法是一開始就走AQS的雙向隊列排隊獲取鎖。更詳細的關於ReentrantLock的實現請看後面寫的一篇文章:http://www.cnblogs.com/200911/p/6035765.html

 

 總結:在獲取同步狀態的時候,同步器維護一個同步隊列,獲取失敗的線程會被加入到隊列中並在隊列中自旋;移除隊列(或中止自旋)的條件是前驅節點爲頭結點而且獲取到了同步狀態。在釋放同步狀態時,同步器調用tryRelease(int args)方法釋放同步狀態,而後喚醒頭結點的後繼節點。AQS的實現思路其實並不複雜,用一句話準確的描述的話,其實就是使用標誌狀態位status(volatile int state)和 一個雙向隊列的入隊和出隊來實現。AQS維護一個線程什麼時候訪問的狀態,它只是對狀態負責,而這個狀態的含義,子類能夠本身去定義。

 

 

 

本身註釋的AQS的源碼:以下:

 

 

public class AbstractQueuedSynchronizerTest {

    /**
     * 
     * (AQS節點的定義,同步隊列的節點定義)
     *
     * <p>
     * 修改歷史:                                            <br>  
     * 修改日期            修改人員       版本             修改內容<br>  
     * -------------------------------------------------<br>  
     * 2016年7月4日 上午10:26:38   user     1.0        初始化建立<br>
     * </p> 
     *
     * @author        Peng.Li 
     * @version        1.0  
     * @since        JDK1.7
     */
    static final class Node {

        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode 
         * 
         * */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled 
         *     在同步隊列中等待的線程等待超時或者被中斷,須要從同步隊列中取消等待
         * */
        static final int CANCELLED = 1;

        /** waitStatus value to indicate successor's thread needs unparking(喚醒)
         *     後繼節點的線程處於等待狀態,而當前的節點若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行。
         **/
        static final int SIGNAL = -1;

        /** waitStatus value to indicate thread is waiting on condition 
         *  節點在等待隊列中,節點的線程等待在Condition上,當其餘線程對Condition調用了signal()方法後,該節點會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中
         **/
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally(無條件的) propagate(傳播)
         * 
         * 表示下一次共享式同步狀態獲取將會被無條件地傳播下去
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated(傳播) to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened(干涉).
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         * 
         * 使用CAS更改狀態,volatile保證線程可見性,即被一個線程修改後,狀態會立馬讓其餘線程可見。
         * 
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueing(入隊), and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         * 
         * 前驅節點,當前節點加入到同步隊列中被設置
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         * 
         * 後繼節點
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         * 
         * 獲取同步狀態的線程
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive(獨有的) mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred(移動到) to the queue(同步隊列) to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         * 
         * 等待隊列中的後繼節點,若是當前節點是共享的,那麼這個字段是一個SHARED常量,
         * 也就是說節點類型(獨佔和共享)和等待隊列中的後繼節點共用同一個字段。
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() { // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) { // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    /**
     * Head of the wait queue, lazily initialized.  Except for (除...之外)
     * initialization(初始化), it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.(若是head引用已經存在,等待狀態保證不會被取消)
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue(等待隊列), lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     * 同步狀態,線程可見的,共享內存裏面保存
     * 
     */
    private volatile int state;

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a <tt>volatile</tt> read.
     * @return current state value
     * 
     * 獲得同步狀態的值
     * 
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a <tt>volatile</tt> write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * Acquires in exclusive(互斥) mode, ignoring(忽視) interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued(排隊), possibly
     * repeatedly(反覆的) blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed(傳達) to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     *        
     *  獨佔式的獲取同步狀態      
     *        
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
    * Creates and enqueues node for current thread and given mode.
    *
    * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
    * @return the new node
    * 
    * 
    * 若是嘗試獲取同步狀態失敗的話,則構造同步節點(獨佔式的Node.EXCLUSIVE),經過    addWaiter(Node node,int args)方法將該節點加入到同步隊列的隊尾。

    * 
    */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // 確保節點可以被安全的添加
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    /**
     * Convenience method to interrupt current thread.
     * 分析:若是在acquireQueued()中,當前線程被中斷過,則執行selfInterrupt();不然不會執行。
     * 線程在阻塞狀態被「中斷喚醒」而獲取CPU的執行權;可是該線程前面還有其餘等待鎖的線程,根據公平性原則,該線程仍然沒法獲取到鎖,他會再次阻塞。
     * 直到該線程被他前面等待鎖的線程喚醒;線程纔會獲取鎖。該線程「成功獲取鎖並真正執行起來以前」,他的中斷會被忽略而且中斷標記會被清除,由於在parkAndCheckInterrupt()中,
     * 咱們線程的中斷狀態時調用了Thread.interrupted(),這個函數在返回中斷狀態以後,還會清除中斷狀態,正由於清除了中斷狀態,因此在selfInterrupt從新產生一箇中斷。
     * 
     * 
     * 當前線程本身產生一箇中斷
     */
    private static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

    /**
    * Acquires in exclusive uninterruptible mode for thread already in
    * queue. Used by condition wait methods as well as acquire.
    *
    * @param node the node
    * @param arg the acquire argument
    * @return {@code true} if interrupted while waiting
    * 
    * acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點才能嘗試獲取同步狀態( p == head && tryAcquire(arg))
    *     緣由是:1.頭結點是成功獲取同步狀態的節點,而頭節點的線程釋放了同步狀態之後,將會喚醒其後繼節點,後繼節點的線程被喚醒後要檢查本身的前驅節點是否爲頭結點。
    *           2.維護同步隊列的FIFO原則,節點進入同步隊列以後,就進入了一個自旋的過程,每一個節點(或者說是每一個線程)都在自省的觀察。
    * 
    */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     * 
     * 同步器經過死循環的方式來保證節點的正確添加,在「死循環」 中經過CAS將節點設置成爲尾節點以後,當前線程才能從該方法中返回,不然當前線程不斷的嘗試設置。
     * enq方法將併發添加節點的請求經過CAS變得「串行化」了。
     * 
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    /**
    * Convenience method to park and then check if interrupted
    *
    * @return {@code true} if interrupted
    * 
    * 阻塞當前線程
    * 
    */
    private final boolean parkAndCheckInterrupt() {
        // 阻塞當前線程
        LockSupport.park(this);
        // 線程被喚醒以後的中斷狀態
        return Thread.interrupted();
    }

    /**
    * Releases in exclusive mode.  Implemented by unblocking one or
    * more threads if {@link #tryRelease} returns true.
    * This method can be used to implement method {@link Lock#unlock}.
    *
    * @param arg the release argument.  This value is conveyed to
    *        {@link #tryRelease} but is otherwise uninterpreted and
    *        can represent anything you like.
    * @return the value returned from {@link #tryRelease}
    * 
    * 釋放公平鎖
    * 
    */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    /**
     * Attempts to set the state to reflect a release in exclusive
     * mode.
     *
     * <p>This method is always invoked by the thread performing release.
     *
     * <p>The default implementation throws
     * {@link UnsupportedOperationException}.
     *
     * @param arg the release argument. This value is always the one
     *        passed to a release method, or the current state value upon
     *        entry to a condition wait.  The value is otherwise
     *        uninterpreted and can represent anything you like.
     * @return {@code true} if this object is now in a fully released
     *         state, so that any waiting threads may attempt to acquire;
     *         and {@code false} otherwise.
     * @throws IllegalMonitorStateException if releasing would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
    * Checks and updates status for a node that failed to acquire.
    * Returns true if thread should block. This is the main signal
    * control in all acquire loops.  Requires that pred == node.prev
    *
    * @param pred node's predecessor holding status
    * @param node the node
    * @return {@code true} if thread should block
    * 返回當前線程是否應該阻塞
    * 
    * 說明:
    (01) 關於waitStatus請參考下表(中擴號內爲waitStatus的值),更多關於waitStatus的內容,能夠參考前面的Node類的介紹。

    CANCELLED[1]  -- 當前線程已被取消
    SIGNAL[-1]    -- 「當前線程的後繼線程須要被unpark(喚醒)」。通常發生狀況是:當前線程的後繼線程處於阻塞狀態,而當前線程被release或cancel掉,所以須要喚醒當前線程的後繼線程。
    CONDITION[-2] -- 當前線程(處在Condition休眠狀態)在等待Condition喚醒
    PROPAGATE[-3] -- (共享鎖)其它線程獲取到「共享鎖」
    [0]           -- 當前線程不屬於上面的任何一種狀態。
    (02) shouldParkAfterFailedAcquire()經過如下規則,判斷「當前線程」是否須要被阻塞。

    規則1:若是前繼節點狀態爲SIGNAL,代表當前節點須要被unpark(喚醒),此時則返回true。
    規則2:若是前繼節點狀態爲CANCELLED(ws>0),說明前繼節點已經被取消,則經過先前回溯找到一個有效(非CANCELLED狀態)的節點,並返回false。
    規則3:若是前繼節點狀態爲非SIGNAL、非CANCELLED,則設置前繼的狀態爲SIGNAL,並返回false。
    * 
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驅節點的狀態
        int ws = pred.waitStatus;
        // 若是前驅節點是SIGNAL狀態,則意味着當前線程須要unpark喚醒,此時返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release to signal it, so it can safely park.
             */
            return true;
        // 若是前繼節點是取消的狀態,則設置當前節點的「當前前繼節點爲」原節點的前繼節點
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet. Caller will need to retry to make sure
             * it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    /**
     * Cancels an ongoing attempt to acquire.
     *
     * @param node the node
     */
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
                    && pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

    /**
    * Wakes up node's successor, if one exists.
    *
    * @param node the node
    */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try to clear in anticipation of signalling. It is OK if this fails or if
         * status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally just the next node. But if cancelled or apparently null, traverse
         * backwards from tail to find the actual non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    /**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     *
     * @param node the node
     */
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a <tt>volatile</tt> read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    /**
     * CAS waitStatus field of a node.
     */
    private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
    }

    /**
     * CAS next field of a node.
     */
    private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * Setup to support compareAndSet. We need to natively implement
     * this here: For the sake of permitting future enhancements, we
     * cannot explicitly subclass AtomicInteger, which would be
     * efficient and useful otherwise. So, as the lesser of evils, we
     * natively implement using hotspot intrinsics(編譯器內部函數) API. And while we
     * are at it, we do the same for other CASable fields (which could
     * otherwise be done with atomic field updaters).
     */
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));

        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

}
View Code

 

AbstractOwnableSynchronizer的源碼以下:

 

package concurrentMy.aqs;

/**
 * 
 * (設置和獲取鎖的持有者線程)
 *
 * <p>
 * 修改歷史:                                            <br>  
 * 修改日期            修改人員       版本             修改內容<br>  
 * -------------------------------------------------<br>  
 * 2016年7月5日 下午3:42:37   user     1.0        初始化建立<br>
 * </p> 
 *
 * @author        Peng.Li 
 * @version        1.0  
 * @since        JDK1.7
 */
public abstract class AbstractOwnableSynchronizerTest implements java.io.Serializable {

    /** Use serial ID even though all fields transient. */
    private static final long serialVersionUID = 3737899427754241961L;

    /**
     * Empty constructor for use by subclasses.
     */
    protected AbstractOwnableSynchronizerTest() {
    }

    /**
     * The current owner of exclusive mode synchronization.
     * 
     * 加 transient 表示exclusiveOwnerThread不能被串行化,不會被做爲序列化的一部分
     * 
     * 鎖的持有線程
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * Sets the thread that currently owns exclusive access. A
     * <tt>null</tt> argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * <tt>volatile</tt> field accesses.
     * 
     * protected final來修飾,表示子類可使用這個方法,可是不能重載這個方法,也就是不能修改這個方法
     */
    protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }

    /**
     * Returns the thread last set by
     * <tt>setExclusiveOwnerThread</tt>, or <tt>null</tt> if never
     * set.  This method does not otherwise impose any synchronization
     * or <tt>volatile</tt> field accesses.
     * @return the owner thread
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}
View Code

 

 

參考文章:

1.Doug Lea的論文: http://gee.cs.oswego.edu/dl/papers/aqs.pdf 

2. 深度解析Java 8:JDK1.8 AbstractQueuedSynchronizer的實現分析(上): http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer

3. 深度解析Java 8:AbstractQueuedSynchronizer的實現分析(下): http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer

4. 深刻淺出 Java Concurrency (7): 鎖機制 part 2 AQS: http://www.blogjava.net/xylz/archive/2010/07/06/325390.html

5 AQS:http://www.cnblogs.com/leesf456/p/5350186.html

6.參考:https://tech.meituan.com/Java_Lock.html

相關文章
相關標籤/搜索