java-AQS源碼淺析

AQS概念及定義

ASQ:AbstractQueuedSynchronizerhtml

它維護了一個volatile int state(表明共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列,有個內部類Node定義了節點。隊列由AQS的volatile成員變量head和tail組成一個雙向鏈表)node

資源共享方式

AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。設計模式

自定義同步器

AQS是抽象類,使用了模板方法設計模式,已經將流程定義好,且實現了對等待隊列的維護,所以實現者只須要按需實現AQS預留的四個方法便可。多線程

  • isHeldExclusively():該線程是否正在獨佔資源。只有用到condition才須要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回true,不然返回false。

通常來講,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種便可。但AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。併發

核心方法分析

1.1 acquire(int)app

方法定義

此方法是獨佔模式下線程獲取共享資源的頂層入口。若是獲取到資源,線程直接返回,不然進入等待隊列,直到獲取到資源爲止,且整個過程忽略中斷的影響。函數

方法源碼

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

函數流程

1. tryAcquire():嘗試獲取資源。
2. addWaiter(Node.EXCLUSIVE):獲取資源失敗,將該線程加入等待隊列尾部,標記爲獨佔模式。
3. acquireQueued(Node,int):獲取該node指定數量的資源數,會一直等待成功獲取才返回,返回值是在獲取期間是否中斷過

源碼分析

1. tryAcquire()高併發

/**
     * Attempts to acquire in exclusive mode. This method should query
     * if the state of the object permits it to be acquired in the
     * exclusive mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread. This can be used
     * to implement method {@link Lock#tryLock()}.
     *
     * <p>The default
     * implementation throws {@link UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return {@code true} if successful. Upon success, this object has
     *         been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
這是個抽象方法,用於給實現者自定義實現,此方法嘗試去獲取獨佔資源。若是獲取成功,則直接返回true,不然直接返回false。這也正是tryLock()的語義。

2. addWaiter(Node)oop

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        //新建Node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //快速嘗試一次,使用CAS將node放到隊尾,失敗調用enq
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //保證將Node放入隊尾
        enq(node);
        return node;
    }

enq源碼源碼分析

/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //若是尾節點爲空,說明隊列還未進行初始化
            if (t == null) { // Must initialize
                //CAS設置頭結點
                if (compareAndSetHead(new Node()))
                    //初始頭尾相同,從下一次循環開始嘗試加入新Node
                    tail = head;
            } else {
                node.prev = t;
                //CAS將當前節點設置爲尾節點
                if (compareAndSetTail(t, node)) {
                    //設置成功返回當前節點
                    t.next = node;
                    return t;
                }
            }
        }
    }

3. acquireQueued(Node, int)

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        //標誌是否成功獲取資源
        boolean failed = true;
        try {
            //是否被中斷
            boolean interrupted = false;
            for (;;) {
                //獲取前驅Node
                final Node p = node.predecessor();
                //若是本身是隊列中第二個節點,那會進行嘗試獲取,進入這裏判斷要麼是一次,要麼是被前驅節點給unPark喚醒了。
                if (p == head && tryAcquire(arg)) {
                    //成功獲取資源,設置自身爲頭節點,將原來的頭結點剝離隊列
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判斷是否須要被park,若是須要進行park並檢測是否被中斷
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //若是獲取資源失敗了將當前node取消,
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire方法

/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //若是前驅的狀態已是signal,表明前驅釋放是會通知喚醒你,那麼此node能夠安心被park
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
             //若是前驅已經被取消,那麼從當前node一直往前找,直到有非取消的node,直接排在它的後面,此時不須要park,會出去再嘗試一次獲取資源。
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             //前驅節點沒有被取消,那麼告訴前驅節點釋放的時候通知本身
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt()

/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        //讓該線程進入wait狀態
        LockSupport.park(this);
        //返回期間是被中斷過
        return Thread.interrupted();
    }

acquireQueued流程總結

  1. 檢查本身是不是老二,且是否能得到資源,能得到本身成爲head節點,不然進入流程2。

2.找到「有效」(not canceled)的前驅,並通知前驅釋放了要「通知」(watiStatus=signal)我,安心被park。
3。被前驅unpark,或interrrupt(),繼續流程1。

acquire小結

  1. 首先調用實現者實現的tryAcquire()去獲取資源,若是成功則直接返回。
  2. 若是失敗,則新建一個獨佔模式的節點加到隊列尾部。
  3. 通知一個有效的前驅記得釋放時喚醒本身,在喚醒時本身再進行不斷tryAcquire()直到獲取到資源,返回是否被中斷過。
  4. 若是等待過程當中被中斷過,則將將中斷補上,調用當前線程的interrupt().

至此acquire流程完結,


1.2 release(int)

方法定義

此方法是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。這也正是unlock()的語義。

方法源碼

/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        //調用實現者的嘗試解鎖方法,由於已經得到鎖,因此基本不會失敗
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //喚醒下一個節點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

unparkSuccessor()

/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)//設置當前節點的狀態容許失敗,失敗了也不要緊。
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
         //找到下一個須要被喚醒的節點
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {//若是是空或被取消
            s = null;
            //從尾節點開始尋找,直到找到最前面的有效的節點。由於鎖已經釋放,因此從尾節點開始找能夠避免由於高併發下複雜的隊列動態變化帶來的邏輯判斷,
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

release小結

  1. 首先調用實現者的tryRelease(),失敗則返回false
  2. 成功則找到下一個有效的節點並喚醒它。

注意實現者實現tryRelease應該是當state爲0時才返回

1.3 acquireShared(int)

方法定義

此方法是共享模式下線程獲取共享資源的頂層入口。若是獲取到資源,線程直接返回。若是有剩餘資源則會喚醒下一個線程,不然進入wait,且整個過程忽略中斷的影響。

方法源碼

/**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        //嘗試獲取指定數量資源
        if (tryAcquireShared(arg) < 0)
            //獲取資源直到成功
            doAcquireShared(arg);
    }

共享模式下的流程與獨佔模式極爲類似,首先根據tryAcquireShared(arg)嘗試是否能獲取到資源,能則直接返回,不能則會進入隊列按入隊順序依次喚醒嘗試獲取。

tryAcquireShared(int)

/**
     * Attempts to acquire in shared mode. This method should query if
     * the state of the object permits it to be acquired in the shared
     * mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread.
     *
     * <p>The default implementation throws {@link
     * UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return a negative value on failure; zero if acquisition in shared
     *         mode succeeded but no subsequent shared-mode acquire can
     *         succeed; and a positive value if acquisition in shared
     *         mode succeeded and subsequent shared-mode acquires might
     *         also succeed, in which case a subsequent waiting thread
     *         must check availability. (Support for three different
     *         return values enables this method to be used in contexts
     *         where acquires only sometimes act exclusively.)  Upon
     *         success, this object has been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if shared mode is not supported
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

這是AQS預留給實現者的方法,用於共享模式下嘗試獲取指定數量的資源,返回值<0表明獲取失敗,=0表明獲取成功且無剩餘資源,>0表明還有剩餘資源

doAcquireShared(int)方法用於共享模式獲取資源會直到獲取成功才返回

/**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        //添加當前線程的Node模式爲共享模式至隊尾,
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //獲取前驅節點
                final Node p = node.predecessor();
                //若是本身是老二纔有嘗試的資格
                if (p == head) {
                    //嘗試獲取指定數量資源
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //若是成功獲取,將當前節點設置爲頭節點,若是有剩餘資源喚醒下一有效節點
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        //若是有中斷,本身補償中斷
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //判斷是否須要被park,和park後檢查是否被中弄斷
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //若是獲取失敗,取消當前節點
            if (failed)
                cancelAcquire(node);
        }
    }

流程和獨佔模式幾乎如出一轍,可是代碼的書寫缺有不一樣,不知原做者是咋想的。區別於獨佔不一樣的有兩點

  1. 添加模式爲SHARED1的Node。
  2. 在成功獲取到資源後,設置當前節點爲head節點時,若是還有剩餘資源的話,會喚醒下一個有效的節點,若是資源數量不夠下一節點,下一節點會一直等待,直到其它節點釋放,並不會讓步給後面的節點,取決於FIFO的按順序出隊。

setHeadAndPropagate()看有剩餘資源的時候如何喚醒下一節點

/**
     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     *
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        //將當前節點設置爲head節點
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
         //若是有剩餘資源
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //當下一個有效節點存在且是共享模式時,會喚醒它
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

doReleaseShared()喚醒下一共享模式節點

/**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //若是頭結點狀態是「通知後繼」
                if (ws == Node.SIGNAL) {
                    //將其狀態改成0,表示已通知
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //喚醒後繼
                    unparkSuccessor(h);
                }
                //若是已通知後繼,則改成可傳播,在下次acquire中的shouldParkAfterFailedAcquire會將改成SIGNAL
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //若是頭結點變了,再次循環
            if (h == head)                   // loop if head changed
                break;
        }
    }

acquireShared小結

共享模式acquire與獨佔模式技術相同,惟一的不一樣就是在於若是當前節點獲取資源成功且有剩餘則會喚醒下一節點,資源能夠爲多個線程功能分配,而獨佔模式則就是一個線程獨佔。

1.4 releaseShared(int)

方法定義

此方法是共享模式下線程釋放共享資源的頂層入口。若是釋放資源成功,直接返回。若是有剩餘資源則會喚醒下一個線程,且整個過程忽略中斷的影響。

方法源碼

/**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        //嘗試共享模式獲取資源
        if (tryReleaseShared(arg)) {
            //喚醒下一節點
            doReleaseShared();
            return true;
        }
        return false;
    }

AQS的源碼分析就到這裏爲止因爲本人目前功力尚淺,對AQS的理解停留在代碼級別,下此會將應用補上,若有不對和遺漏歡迎各位補充。

參考文章
Java併發之AQS詳解

相關文章
相關標籤/搜索