AbstractQueuedSynchronizer

1 簡介

AbstractQueuedSynchronizer簡稱AQS是一個抽象同步框架,能夠用來實現一個依賴狀態的同步器。
JDK1.5中提供的java.util.concurrent包中的大多數的同步器(Synchronizer)如Lock, Semaphore, Latch, Barrier等,這些類之間大多能夠互相實現,如使用Lock實現一個Semaphore或者反過來,可是它們都是基於java.util.concurrent.locks.AbstractQueuedSynchronizer這個類的框架實現的,理解了這個稍微複雜抽象的類再去理解其餘的同步器就很輕鬆了。java

2 原理介紹

AQS的核心是一個線程等待隊列,採用的是一個先進先出FIFO隊列。用來實現一個非阻塞的同步器隊列有主要有兩個選擇Mellor-Crummey and Scott (MCS) locks和Craig, Landin, and Hagersten (CLH) locks的變種。CLH鎖更適合處理取消和超時,因此AQS基於CLH進行修改做爲線程等待隊列。
CLH隊列使用pred引用前節點造成一個隊列,入隊enqueue和出隊dequeue操做均可以經過原子操做完成。node

 

在 AQS 內部,經過維護一個FIFO 隊列來管理多線程的排隊工做。在公平競爭的狀況下,沒法獲取同步狀態的線程將會被封裝成一個節點,置於隊列尾部。入隊的線程將會經過自旋的方式獲取同步狀態,若在有限次的嘗試後,仍未獲取成功,線程則會被阻塞住。大體示意圖以下:算法

當頭結點釋放同步狀態後,且後繼節點對應的線程被阻塞,此時頭結點線程將會去喚醒後繼節點線程。後繼節點線程恢復運行並獲取同步狀態後,會將舊的頭結點從隊列中移除,並將本身設爲頭結點。大體示意圖以下:多線程

其中每一個節點包含以下狀態:app

/** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
CANCELED表示線程等待已經取消,是惟一一個大於0的狀態。
SINALG表示須要喚醒next節點
CONDITION代表線程正在等待一個條件
PROPAGATE用於acquireShared中向後傳播

3 acquire

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

一、若是嘗試獲取鎖成功整個獲取操做就結束,不然轉到2. 嘗試獲取鎖是經過方法tryAcquire來實現的,AQS中並無該方法的具體實現,只是簡單地拋出一個不支持操做異常,在AQS簡介中談到tryAcquire有不少實現方法,這裏再也不細化,只須要知道若是獲取鎖成功該方法返回true便可;框架

二、若是獲取鎖失敗,那麼就建立一個表明當前線程的結點加入到等待隊列的尾部,是經過addWaiter方法實現的,來看該方法的具體實現:oop

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

該方法建立了一個獨佔式結點,而後判斷隊列中是否有元素,若是有(pred!=null)就設置當前結點爲隊尾結點,即將當前節點插入到尾節點的後面,而後返回;ui

若是沒有元素(pred==null),表示隊列爲空,走的是入隊操做this

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq方法採用的是變種CLH算法,先看頭結點是否爲空,若是爲空就建立一個傀儡結點,頭尾指針都指向這個傀儡結點,這一步只會在隊列初始化時會執行;atom

若是頭結點非空,就採用CAS操做將當前結點插入到頭結點後面,若是在插入的時候尾結點有變化,就將尾結點向後移動直到移動到最後一個結點爲止,而後再把當前結點插入到尾結點後面,尾指針指向當前結點,入隊成功。

三、將新加入的結點放入隊列以後,這個結點有兩種狀態,要麼獲取鎖,要麼就掛起,若是這個結點不是頭結點的後繼節點,就看看這個結點是否應該掛起,若是應該掛起,就掛起當前結點,是否應該掛起是經過shouldParkAfterFailedAcquire方法來判斷的  

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //判斷前驅節點是不是頭節點,若是是,則循環獲取同步
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //若是獲取同步狀態失敗或者前驅節點不是頭節點,則判斷是否將當前節點掛起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }    

  

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
/**
 * 該方法主要用途是,當線程在獲取同步狀態失敗時,根據前驅節點的等待狀態,決定後續的動做。好比前驅
 * 節點等待狀態爲 SIGNAL,代表當前節點線程應該被阻塞住了。不能總是嘗試,避免 CPU 忙等。
 *    —————————————————————————————————————————————————————————————————
 *    | 前驅節點等待狀態 |                   相應動做                     |
 *    —————————————————————————————————————————————————————————————————
 *    | SIGNAL         | 阻塞                                          |
 *    | CANCELLED      | 向前遍歷, 移除前面全部爲該狀態的節點               |
 *    | waitStatus < 0 | 將前驅節點狀態設爲 SIGNAL, 並再次嘗試獲取同步狀態   |
 *    —————————————————————————————————————————————————————————————————
 */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //首先檢查前趨結點的waitStatus位,若是爲SIGNAL,表示前趨結點會通知它,那麼它能夠放心大膽地掛起了
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            //若是前趨結點是一個被取消的結點怎麼辦呢?那麼就向前遍歷跳過被取消的結點,直到找到一個沒有被取消的結點爲止,將找到的這個結點做爲它的前趨結點,
       //將找到的這個結點的waitStatus位設置爲SIGNAL,返回false表示線程不該該被掛起,繼續嘗試獲取鎖 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ /* * 等待狀態爲 0 或 PROPAGATE,設置前驅節點等待狀態爲 SIGNAL, * 並再次嘗試獲取同步狀態。 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() {
    // 調用 LockSupport.park 阻塞本身
    LockSupport.park(this);
    return Thread.interrupted();
}

 4 release

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

  

一、release過程比acquire要簡單,首先調用tryRelease釋放鎖,若是釋放失敗,直接返回;

二、釋放鎖成功後須要喚醒繼任結點,是經過方法unparkSuccessor實現的

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

一、node參數傳進來的是頭結點,首先檢查頭結點的waitStatus位,若是爲負,表示頭結點還須要通知後繼結點,這裏不須要頭結點去通知後繼,所以將該該標誌位清0.

二、而後查看頭結點的下一個結點,若是下一個結點不爲空且它的waitStatus<=0,表示後繼結點沒有被取消,是一個能夠喚醒的結點,因而喚醒後繼結點返回;若是後繼結點爲空或者被取消了怎麼辦?尋找下一個可喚醒的結點,而後喚醒它返回。

這裏並無從頭向尾尋找,從隊列尾部開始向前查找,找到隊列最前面沒有被取消的節點,而後,將其喚醒。

爲何須要從隊列尾部開始向前查找呢?

由於在CLH隊列中的結點隨時有可能被中斷,被中斷的結點的waitStatus設置爲CANCEL,並且它會被踢出CLH隊列,如何個踢出法,就是它的前趨結點的next並不會指向它,而是指向下一個非CANCEL的結點,而它本身的next指針指向它本身。一旦這種狀況發生,如何從頭向尾方向尋找繼任結點會出現問題,由於一個CANCEL結點的next爲本身,那麼就找不到正確的繼任接點。

有的人又會問了,CANCEL結點的next指針爲何要指向它本身,爲何不指向真正的next結點?爲何不爲NULL?

第一個問題的答案是這種被CANCEL的結點最終會被GC回收,若是指向next結點,GC沒法回收。

相關文章
相關標籤/搜索