Java併發包源碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別

Java併發包源碼學習系列:AQS共享模式獲取與釋放資源

往期回顧:html

上一篇文章介紹了AQS內置隊列節點的出隊入隊操做,以及獨佔式獲取共享資源與釋放資源的詳細流程,爲告終構完整,本篇繼續以AQS的角度介紹另一種:共享模式獲取與釋放資源的細節,本篇暫不分析具體子類如ReentrantLock、ReentrantReadWriteLock的實現,以後會陸續補充。java

獨佔式獲取資源

友情提示:本篇文章着重介紹共享模式獲取和釋放資源的特色,許多代碼實現上面和共享式和獨佔式其實邏輯差很少,爲了清晰對比,這邊會將獨佔式的部分核心代碼粘貼過來,注意理解共享式和獨佔式存在差別的地方。詳細解析可戳:Java併發包源碼學習系列:CLH同步隊列及同步資源獲取與釋放node

void acquire(int arg)

public final void acquire(int arg) {
        if (!tryAcquire(arg) && // tryAcquire由子類實現,表示獲取鎖,若是成功,這個方法直接返回了
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 若是獲取失敗,執行
            selfInterrupt();
    }

boolean acquireQueued(Node, int)

// 這個方法若是返回true,代碼將進入selfInterrupt()
	final boolean acquireQueued(final Node node, int arg) {
        // 注意默認爲true
        boolean failed = true;
        try {
            // 是否中斷
            boolean interrupted = false;
            // 自旋,即死循環
            for (;;) {
                // 獲得node的前驅節點
                final Node p = node.predecessor();
                // 咱們知道head是虛擬的頭節點,p==head表示若是node爲阻塞隊列的第一個真實節點
                // 就執行tryAcquire邏輯,這裏tryAcquire也須要由子類實現
                if (p == head && tryAcquire(arg)) {
                    // tryAcquire獲取成功走到這,執行setHead出隊操做 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 走到這有兩種狀況 1.node不是第一個節點 2.tryAcquire爭奪鎖失敗了
                // 這裏就判斷 若是當前線程爭鎖失敗,是否須要掛起當前這個線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 死循環退出,只有tryAcquire獲取鎖失敗的時候failed才爲true
            if (failed)
                cancelAcquire(node);
        }
    }

獨佔式釋放資源

boolean release(int arg)

public final boolean release(int arg) {
        if (tryRelease(arg)) { // 子類實現tryRelease方法
            // 得到當前head
            Node h = head;
            // head不爲null而且head的等待狀態不爲0
            if (h != null && h.waitStatus != 0)
                // 喚醒下一個能夠被喚醒的線程,不必定是next哦
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

void unparkSuccessor(Node node)

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        // 若是node的waitStatus<0爲signal,CAS修改成0
        // 將 head 節點的 ws 改爲 0,清除信號。表示,他已經釋放過了。不能重複釋放。
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 喚醒後繼節點,可是有可能後繼節點取消了等待 即 waitStatus == 1
        Node s = node.next;
        // 若是後繼節點爲空或者它已經放棄鎖了
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 從隊尾往前找,找到沒有沒取消的全部節點排在最前面的【直到t爲null或t==node才退出循環嘛】
            for (Node t = tail; t != null && t != node; t = t.prev)
                // 若是>0表示節點被取消了,就一直向前找唄,找到以後不會return,還會一直向前
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 若是後繼節點存在且沒有被取消,會走到這,直接喚醒後繼節點便可
        if (s != null)
            LockSupport.unpark(s.thread);
    }

共享式獲取資源

void acquireShared(int arg)

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) //子類實現
            doAcquireShared(arg);
    }
  • tryAcquireShared(int)是AQS提供給子類實現的鉤子方法,子類能夠自定義實現共享式獲取資源的方式,獲取狀態失敗返回小於0,返回零值表示被獨佔方式獲取,返回正值表示共享方式獲取。
  • 若是獲取失敗,則進入doAcquireShared(arg);的邏輯。

void doAcquireShared(int arg)

注意這裏和獨佔式獲取資源acquireQueued的區別。編程

private void doAcquireShared(int arg) {
        // 包裝成共享模式的節點,入隊
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自旋
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 嘗試獲取同步狀態,子類實現
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 設置新的首節點,並根據條件,喚醒下一個節點
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

咱們能夠看到有幾個存在差別的地方:併發

  1. 在共享式獲取資源失敗的時候,會包裝成SHARED模式的節點入隊。
  2. 若是前驅節點爲head,則使用tryAcquireShared方法嘗試獲取同步狀態,這個方法由子類實現。
  3. 若是獲取成功r>=0,這時調用setHeadAndPropagate(node, r),該方法首先會設置新的首節點,將第一個節點出隊,接着會不斷喚醒下一個共享模式節點,實現同步狀態被多個線程共享獲取。

接下來咱們着重看下setHeadAndPropagate方法。app

void setHeadAndPropagate(Node node, int propagate)

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 節點出隊,設置node爲新的head
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        // 這個方法進來的時候propagate>=0
        // propagate>0表示同步狀態還能夠被後面的節點獲取
        // h指向原先的head節點,以後h = head,h表示新的head節點
        // h.waitStatus<0表示該節點後面還有節點須要被喚醒
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 獲取下一個節點
            Node s = node.next;
            // 沒有下一個節點或下一個節點爲共享式獲取狀態
            if (s == null || s.isShared())
                // 喚醒後續的共享式獲取同步狀態的節點
                doReleaseShared();
        }
    }
  • 先記錄一下原來的頭節點,而後設置node爲新的頭節點。
  • 原先的頭節點或新的頭節點等待狀態是propagate或signal,能夠繼續向下喚醒。
  • 若是判斷下個節點爲shared節點,調用共享式釋放資源方法喚醒後續節點。

共享式釋放資源

boolean releaseShared(int arg)

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 子類實現
            doReleaseShared();
            return true;
        }
        return false;
    }

doReleaseShared()

能夠發現共享模式下,不管是獲取資源仍是釋放資源都調用了doReleaseShared方法,可見該方法是共享模式釋放資源喚醒節點的核心方法,主要功能是喚醒下一個線程或者設置傳播狀態oop

後繼線程被喚醒後,會嘗試獲取共享鎖,若是成功以後,則又會調用setHeadAndPropagate,將喚醒傳播下去。這個方法的做用是保障在acquire和release存在競爭的狀況下,保證隊列中處於等待狀態的節點可以有辦法被喚醒。學習

private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        // 自旋
        for (;;) {
            Node h = head;
            // 隊列已經初始化且至少有一個節點
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 不管是獨佔仍是共享,只有節點的ws爲signal的時候,纔會在釋放的時候,喚醒後面的節點
                if (ws == Node.SIGNAL) {
                    // cas將ws設置爲0,設置失敗,將會繼續從循環開始
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 喚醒後繼節點,unparkSuccessor這個方法上面已經解析過
                    unparkSuccessor(h);
                }
                // 若是ws爲0,則更新狀態爲propagate,
                // 以後setHeadAndPropagate讀到ws<0的時候,會繼續喚醒後面節點
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 若是head在這個過程當中被更改了,會繼續自旋
            if (h == head)                   // loop if head changed
                break;
        }
    }

該方法在 head 節點存在後繼節點的狀況下,作了兩件事情:ui

  1. 若是 head 節點等待狀態爲 SIGNAL,則將 head 節點狀態設爲 0,並喚醒後繼未取消節點。this

  2. 若是 head 節點等待狀態爲 0,則將 head 節點狀態設爲 PROPAGATE,保證喚醒可以正常傳播下去。

設置PROPAGATE的做用:PROPAGATE狀態用在[setHeadAndPropagate](#void setHeadAndPropagate(Node node, int propagate)) ,當頭節點狀態被設爲 PROPAGATE 後,後繼節點成爲新的頭結點後。若 propagate > 0 條件不成立,則根據條件h.waitStatus < 0成立與否,來決定是否喚醒後繼節點,即向後傳播喚醒動做。

引入PROPAGATE是爲了解決什麼問題?

AbstractQueuedSynchronizer源碼解讀,強烈建議閱讀這篇博客。

獨佔式和共享式的區別總結

共享式獲取與獨佔式獲取最大的區別就是同一時刻可否有多個線程同時獲取到同步狀態。

  • 共享式訪問資源時,同一時刻其餘共享式的訪問會被容許。
  • 獨佔式訪問資源時,同一時刻其餘訪問均被阻塞。

AQS都提供了子類實現的鉤子方法,獨佔式的表明方法有:tryAcquire和tryRelease以及isHeldExclusively方法,共享式的表明方法有:tryAcquireShared和tryReleaseShared方法。

AQS中獲取操做和釋放操做的標準形式:

boolean acquire() throws InterruptedException{
    while( 當前狀態不容許獲取操做 ){
        if( 須要阻塞獲取請求){
            若是當前線程不在隊列中,則將其插入隊列
            阻塞當前線程
        }else{
            返回失敗
        }
    }
    可能更新同步器的狀態
    若是線程位於隊列中,則將其移除隊列
    返回成功
}

void release(){
    更新同步器的狀態
    if( 新的狀態容許某個被阻塞的線程獲取成功 ){
        解除隊列中一個或多個線程的阻塞狀態
    }
}

圖源:《併發編程的藝術》下圖是獨佔式同步狀態獲取的流程

當某個線程爭奪同步資源失敗以後,他們都會將線程包裝爲節點,並加入CLH同步隊列的隊尾,並保持自旋,一個是addWaiter(Node.EXCLUSIVE),一個是addWaiter(Node.EXCLUSIVE)

同步隊列中的線程在自旋時會判斷其前驅節點是否爲首節點,若是是首節點node.predecessor() == head,他們都會嘗試獲取同步狀態,只不過:

  • 獨佔式獲取狀態成功後,只會出隊一個節點。
  • 共享式獲取狀態成功後,除了出隊一個節點,還會喚醒後面的節點。

線程執行完邏輯以後,他們都會釋放同步狀態,釋放以後將會unparkSuccessor(h)喚醒其後可被喚醒的某個後繼節點。

參考閱讀

相關文章
相關標籤/搜索