併發包學習(三)-AbstractQueuedSynchronizer總結

 J.U.C學習的第二篇AQS。AQS在Java併發包中的重要性,毋庸置疑,因此單獨拿出來理一理。本文參考總結自《Java併發編程的藝術》第五章第二節隊列同步器。node

什麼是AbstractQueuedSynchronizer?

AbstractQueuedSynchronizer是JUC併發包中鎖的底層支持,AbstractQueuedSynchronizer是抽象同步隊列,簡稱AQS,是實現同步器的基礎組件,併發包中鎖的實現底層就是使用AQS實現,固然大多數人不會直接用到AQS,可是學習這個類對併發包的底層理解仍是有莫大的幫助的。算法

AQS中維持了一個單一的狀態信息state,能夠經過getState,setState,compareAndSetState 函數修改其值,AQS內部維持一個FIFO隊列來完成資源獲取線程的排隊工做。對於ReentrantLock 的實現來講,state 能夠用來表示當前線程獲取鎖的可重入次數;編程

對應讀寫鎖ReentrantReadWriteLock 來講state 的高16位表示讀狀態,也就是獲取該讀鎖的次數,低 16位 表示獲取到寫鎖的線程的可重入次數;對於FuterTask 來講,state用來表示任務狀態(例如,還沒開始,運行,完成,取消);安全

對應CountDownlatch 和CyclicBarrie 來講,state用來表示計數器當前的值。併發

AQS有個內部類ConditionObject 是用來結合鎖實現線程同步,ConditionObject能夠直接訪問AQS對象內部的變量,好比 state 狀態值 和AQS隊列;框架

ConditionObject是條件變量,每一個條件變量對應一個條件隊列(單向鏈表隊列),用來存放調用條件變量的await()方法後被阻塞的線程。函數

對於AQS 來講,線程同步的關鍵是對狀態值state進行操做,根據state是否屬於一個線程,操做state的方式分爲獨佔模式和共享模式。學習

獨佔模式下獲取和釋放資源使用方法的源碼以下:ui

void acquire(int arg)
void acquireInterruptibly(int arg)
boolean release(int arg)

共享模式下獲取和釋放資源方法的源碼以下:this

void acquireShared(int arg)
void acquireSharedInterruptibly(int arg)
boolean releaseShared(int arg)

另外還有個查詢同步隊列等待線程狀況的方法以下:

Collection<Thread> getQueuedThreads()

對於獨佔鎖方式獲取的資源是與具體線程綁定的,也就是說若是一個線程獲取到了資源,就會標記是那個線程獲取到的,其餘線程嘗試操做state獲取資源時候發現當前該資源不是本身持有,就會獲取失敗後被阻塞;

好比獨佔鎖ReentrantLock的實現,當一個線程獲取了ReentrantLock的鎖後,AQS內部會首先使用CAS操做把state狀態從0 變成 1,而後設置當前鎖的持有者爲當前線程,當該線程再次獲取鎖的時候,發現當前線程就是鎖的持有者,則會把state狀態值從1變成2,

也就是設置可重入次數,當另一個線程獲取鎖的時候發現本身並非該鎖的持有者就會被放入AQS阻塞隊列後掛起。

對於共享操做方式資源是與具體線程不相關的,多個線程去請求資源時候是經過CAS方式競爭獲取資源,當一個線程獲取到了資源後,另一個線程再次獲取時候,若是 當前資源還能知足它的須要,則當前線程只須要使用CAS方式進行獲取便可,

共享模式下並不須要記錄哪一個線程獲取了資源;好比 Semaphore 信號量,當一個線程經過acquire()方法獲取一個信號量時候,會首先看當前信號兩個數是否知足須要,不知足則把當前線程放入阻塞隊列,若是知足則經過自旋CAS獲取信號量。

隊列同步器的實現分析

一、同步隊列

 同步隊列即當線程獲取同步狀態失敗,同步器會將當前線程以及等待信息構成節點Node加入同步隊列,同時阻塞當前線程,當同步狀態釋放,會把首節點的線程喚醒,使其再次嘗試獲取同步狀態。

節點是構成同步隊列的基礎,同步器擁有首節點head和尾節點tail,獲取狀態失敗的線程會加入隊列的尾部,結構如圖:

同步隊列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的線程在釋放同步狀態時,將會喚醒後繼節點,後繼節點將會在獲取同步狀態成功時將本身設置爲首節點,以下圖

二、獨佔式同步狀態獲取與釋放

調用acquire(int arg)方法獲取同步狀態,當獲取失敗進入同步隊列,後續線程中斷,也不會從同步隊列移出。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

首先咱們調用acquire(int arg)方法後,而後調用自定義同步器實現的tryAcquire方法 嘗試獲取同步狀態,具體是設置狀態變量state的值,成功則直接返回。失敗則將當前線程封裝爲類型Node.EXCLUSIVE 的Node節點,並調用addWaiter(Node node)方法將該節點插入到AQS阻塞隊列尾部,最後調用acquireQueued(Node node,int arg)方法,使得該節點以「死循環」的方式獲取同步狀態,若是獲取不到則阻塞節點線程,而被阻塞的線程的喚醒主要依靠前驅節點的出隊或被中斷來實現。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

上述尾節點的添加代碼可知,使用了CAS的方式保證了尾節點的安全添加,避免了在併發的狀況下節點數量誤差或者順序混亂的狀況。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

當前線程以「死循環」的方式獲取同步狀態,而只有前驅節點是頭節點才能嘗試獲取同步狀態,這又是爲何呢,緣由有兩點。

第一,頭節點是成功獲取到同步狀態的節點,而頭節點釋放狀態後,將喚醒後繼節點,後繼節點被喚醒後也須要檢查本身的前驅節點是不是頭節點。

第二,維護同步隊列的FIFO原則。

獨佔式的獲取同步狀態的流程以下:

 當一個線程獲取同步狀態並執行完相關邏輯後,就須要釋放同步狀態,使得後續節點可以獲取,具體代碼以下:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

該方法會在釋放同步狀態後,喚醒後繼節點,unparkSuccessor(Node node)方法使用LockSupport來喚醒等待的線程。

總結:在獲取同步狀態時,同步器維護了一個同步隊列,獲取狀態失敗的線程都會被加入到隊列並在隊列自旋;移除隊列的條件是前驅節點爲頭節點且成功獲取同步狀態。在釋放同步狀態時,同步器調用tryRelease(int arg)方法釋放同步狀態,而後喚醒頭節點的後繼節點。

這裏須要注意的是AQS類並無提供可用的tryAcquire 和 tryRelease,正如AQS是鎖阻塞和同步容器的基礎框架,是抽象類,tryAcquire和 tryRelease 須要有具體的子類來實現的。

子類在實現tryAcquire 和 tryRelease 時候要根據具體場景使用CAS算法嘗試修改該狀態值state,成功則返回true,不然返回false。子類還須要定義在調用acquire 和 release 方法時候 state 狀態值的增減表明什麼含義。

好比繼承自AQS實現的獨佔鎖ReentrantLock,定義當status爲0的時候標示鎖空閒,爲1 的時候標示鎖已經被佔用,在重寫tryAcquire的時候,內部須要使用CAS算法看當前status是否爲0,若是爲0 則使用CAS設置爲1,

並設置當前線程持有者爲當前線程,並返回true,若是CAS失敗則返回false。繼承自 AQS 實現的獨佔鎖實現 tryRelease 時候,內部須要使用CAS算法把當前status值從1 修改成0,並設置當前鎖的持有者爲null,而後返回true,若是CAS失敗則返回false。

三、共享式同步狀態獲取與釋放

共享模式和獨佔模式最大的區別就是同一時刻是否能有多個線程同時獲取狀態。

當共享模式訪問資源時,其餘共享式的訪問均被容許,而獨佔式訪問被阻塞。而獨佔式訪問資源時,同一時刻其餘任何訪問均被阻塞。

  public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

線程調用acquireShared(int arg) 獲取共享資源時候,會首先使用tryAcquireShared嘗試獲取資源,當返回值大於等於0,表示能夠獲取狀態。失敗則將當前線程封裝爲類型Node.SHARED 的 Node 節點後插入到 AQS 阻塞隊列尾部,並使用 LockSupport.park(this) 掛起當前線程。

在doAcquireShared(arg)方法中,若是前驅節點爲頭節點,嘗試獲取同步狀態大於0,則表示能夠獲取同步狀態並從自旋過程當中退出。

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

當一個線程調用 releaseShared(int arg) 時候會嘗試使用, tryReleaseShared 操做釋放資源,這裏是設置狀態變量 state 的值,而後使用 LockSupport.unpark(thread)激活 AQS 隊列裏面最先被阻塞的線程 (thread)。

同理須要注意的 AQS 類並無提供可用的 tryAcquireShared 和 tryReleaseShared,正如 AQS 是鎖阻塞和同步器的基礎框架,tryAcquireShared 和 tryReleaseShared 須要有具體的子類來實現。

子類在實現 tryAcquireShared 和 tryReleaseShared 時候要根據具體場景使用 CAS 算法嘗試修改狀態值 state, 成功則返回 true,否者返回 false。

好比繼承自 AQS 實現的讀寫鎖 ReentrantReadWriteLock 裏面的讀鎖在重寫 tryAcquireShared 時候,首先看寫鎖是否被其它線程持有,若是是則直接返回 false,否者使用 CAS 遞增 status 的高 16 位,在 ReentrantReadWriteLock 中 status 的高 16 爲獲取讀鎖的次數。

繼承自 AQS 實現的讀寫鎖 ReentrantReadWriteLock 裏面的讀鎖在重寫 tryReleaseShared 時候,內部須要使用 CAS 算法把當前 status 值的高 16 位減一,而後返回 true, 若是 cas 失敗則返回 false。

相關文章
相關標籤/搜索