淺談Java併發編程系列(九)—— AQS結構及原理分析

AQS介紹

AQS,即AbstractQueuedSynchronizer, 隊列同步器,它是Java併發用來構建鎖和其餘同步組件的基礎框架。來看下同步組件對AQS的使用:
圖片描述java

AQS是一個抽象類,主是是以繼承的方式使用。AQS自己是沒有實現任何同步接口的,它僅僅只是定義了同步狀態的獲取和釋放的方法來供自定義的同步組件的使用。從圖中能夠看出,在java的同步組件中,AQS的子類(Sync等)通常是同步組件的靜態內部類,即經過組合的方式使用。node

AQS原理簡介

AQS的實現依賴內部的同步隊列(FIFO雙向隊列),若是當前線程獲取同步狀態失敗,AQS會將該線程以及等待狀態等信息構形成一個Node,將其加入同步隊列的尾部,同時阻塞當前線程,當同步狀態釋放時,喚醒隊列的頭節點。安全

上面說的有點抽象,來具體看下,首先來看AQS最主要的三個成員變量:併發

private transient volatile Node head;
    
    private transient volatile Node tail;

    private volatile int state;

上面提到的同步狀態就是這個int型的變量state. head和tail分別是同步隊列的頭結點和尾結點。假設state=0表示同步狀態可用(若是用於鎖,則表示鎖可用),state=1表示同步狀態已被佔用(鎖被佔用)。app

下面舉例說下獲取和釋放同步狀態的過程:框架

獲取同步狀態

假設線程A要獲取同步狀態(這裏想象成鎖,方便理解),初始狀態下state=0,因此線程A能夠順利獲取鎖,A獲取鎖後將state置爲1。在A沒有釋放鎖期間,線程B也來獲取鎖,此時由於state=1,表示鎖被佔用,因此將B的線程信息和等待狀態等信息構成出一個Node節點對象,放入同步隊列,head和tail分別指向隊列的頭部和尾部(此時隊列中有一個空的Node節點做爲頭點,head指向這個空節點,空Node的後繼節點是B對應的Node節點,tail指向它),同時阻塞線程B(這裏的阻塞使用的是LockSupport.park()方法)。後續若是再有線程要獲取鎖,都會加入隊列尾部並阻塞。源碼分析

釋放同步狀態

當線程A釋放鎖時,即將state置爲0,此時A會喚醒頭節點的後繼節點(所謂喚醒,實際上是調用LockSupport.unpark(B)方法),即B線程從LockSupport.park()方法返回,此時B發現state已經爲0,因此B線程能夠順利獲取鎖,B獲取鎖後B的Node節點隨之出隊。ui

上面只是簡單介紹了AQS獲取和釋放的大體過程,下面結合AQS和ReentrantLock源碼來具體看下JDK是如何實現的,特別要注意JDK是如何保證同步和併發操做的。this

AQS源碼分析

接下來以ReentrantLock的源碼入手來深刻理解下AQS的實現。
上面說過AQS通常是以繼承的方式被使用,同步組件內部組合一個繼承了AQS的子類。
在ReentrantLock類中,有一個Sync成員變量,便是繼承了AQS的子類,源碼以下:spa

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }
}

這裏的Sync也是一個抽象類,其實現類爲FairSync和NonfairSync,分別對應公平鎖和非公平鎖。ReentrantLock的提供一個入參爲boolean值的構造方法,來肯定使用公平鎖仍是非公平鎖:

public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
     }

獲取鎖

這裏以NonfairSync類爲例,看下它的Lock()的實現:

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
     }

lock方法先經過CAS嘗試將同步狀態(AQS的state屬性)從0修改成1。若直接修改爲功了,則將佔用鎖的線程設置爲當前線程。看下compareAndSetState()和setExclusiveOwnerThread()實現:

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
     }

能夠看到compareAndSetState底層實際上是調用的unsafe的CAS系列方法。

protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

exclusiveOwnerThread屬性是AQS從父類AbstractOwnableSynchronizer中繼承的屬性,用來保存當前佔用同步狀態的線程。

若是CAS操做未能成功,說明state已經不爲0,此時繼續acquire(1)操做,這個acquire()由AQS實現提供:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

代碼很短,不太好了理解,轉換下寫法(代碼1):

public final void acquire(int arg) {
        boolean hasAcquired = tryAcquire(arg);
        if (!hasAcquired) {
            Node currentThreadNode = addWaiter(Node.EXCLUSIVE);
            boolean interrupted = acquireQueued(currentThreadNode, arg);
            if (interrupted) {
                selfInterrupt();
            }
        }
    }

簡單解釋下:
tryAcquire方法嘗試獲取鎖,若是成功就返回,若是不成功,則把當前線程和等待狀態信息構適成一個Node節點,並將結點放入同步隊列的尾部。而後爲同步隊列中的當前節點循環等待獲取鎖,直到成功。

首先看tryAcquire(arg)在NonfairSync中的實現(這裏arg=1):

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

首先獲取AQS的同步狀態(state),在鎖中就是鎖的狀態,若是狀態爲0,則嘗試設置狀態爲arg(這裏爲1), 若設置成功則表示當前線程獲取鎖,返回true。這個操做外部方法lock()就作過一次,這裏再作只是爲了再嘗試一次,儘可能以最簡單的方式獲取鎖。

若是狀態不爲0,再判斷當前線程是不是鎖的owner(即當前線程在以前已經獲取鎖,這裏又來獲取),若是是owner, 則嘗試將狀態值增長acquires,若是這個狀態值越界,拋出異常;若是沒有越界,則設置後返回true。這裏能夠看非公平鎖的涵義,即獲取鎖並不會嚴格根據爭用鎖的前後順序決定。這裏的實現邏輯相似synchroized關鍵字的偏向鎖的作法,便可重入而不用進一步進行鎖的競爭,也解釋了ReentrantLock中Reentrant的意義。

若是狀態不爲0,且當前線程不是owner,則返回false。
回到上面的代碼1,tryAcquire返回false,接着執行addWaiter(Node.EXCLUSIVE),這個方法建立結點併入隊,來看下源碼:

private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }

首先建立一個Node對象,Node中包含了當前線程和Node模式(這時是排他模式)。tail是AQS的中表示同步隊列隊尾的屬性,剛開始爲null,因此進行enq(node)方法,從字面能夠看出這是一個入隊操做,來看下具體入隊細節:

private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

方法體是一個死循環,自己沒有鎖,能夠多個線程併發訪問,假如某個線程進入方法,此時head, tail都爲null, 進入if(t==null)區域,從方法名能夠看出這裏是用CAS的方式建立一個空的Node做爲頭結點,由於此時隊列中只一個頭結點,因此tail也指向它,第一次循環執行結束。注意這裏使用CAS是防止多個線程併發執行到這兒時,只有一個線程可以執行成功,防止建立多個同步隊列。

進行第二次循環時(或者是其餘線程enq時),tail不爲null,進入else區域。將當前線程的Node結點(簡稱CNode)的prev指向tail,而後使用CAS將tail指向CNode。看下這裏的實現:

private final boolean compareAndSetTail(Node expect, Node update) {
            return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
        }

expect爲t, t此時指向tail,因此能夠CAS成功,將tail從新指向CNode。此時t爲更新前的tail的值,即指向空的頭結點,t.next=node,就將頭結點的後續結點指向CNode,返回頭結點。通過上面的操做,頭結點和CNode的關係如圖:

圖片描述

其餘線程再插入節點以此類推,都是在追加到鏈表尾部,而且經過CAS操做保證線程安全。

經過上面分析可知,AQS的寫入是一種雙向鏈表的插入操做,至此addWaiter分析完畢。

addWaiter返回了插入的節點,做爲acquireQueued方法的入參,看下源碼:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

能夠看到,acquireQueued方法也是一個死循環,直到進入 if (p == head && tryAcquire(arg))條件方法塊。仍是接着剛纔的操做來分析。acquireQueued接收的參數是addWaiter方法的返回值,也就是剛纔的CNode節點,arg=1。node.predecessor()返回CNode的前置節點,在這裏也就是head節點,因此p==head成立,進而進行tryAcquire操做,即爭用鎖, 若是獲取成功,則進入if方法體,看下接下來的操做:

1) 將CNode設置爲頭節點。
2) 將CNode的前置節點設置的next設置爲null。

此時隊列如圖:

圖片描述

上面操做即完成了FIFO的出隊操做。
從上面的分析能夠看出,只有隊列的第二個節點能夠有機會爭用鎖,若是成功獲取鎖,則此節點晉升爲頭節點。對於第三個及之後的節點,if (p == head)條件不成立,首先進行shouldParkAfterFailedAcquire(p, node)操做(爭用鎖失敗的第二個節點也如此), 來看下源碼:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

shouldParkAfterFailedAcquire方法是判斷一個爭用鎖的線程是否應該被阻塞。它首先判斷一個節點的前置節點的狀態是否爲Node.SIGNAL,若是是,是說明此節點已經將狀態設置若是鎖釋放,則應當通知它,因此它能夠安全的阻塞了,返回true。

若是前節點的狀態大於0,即爲CANCELLED狀態時,則會從前節點開始逐步循環找到一個沒有被「CANCELLED」節點設置爲當前節點的前節點,返回false。在下次循環執行shouldParkAfterFailedAcquire時,返回true。這個操做實際是把隊列中CANCELLED的節點剔除掉。

前節點狀態小於0的狀況是對應ReentrantLock的Condition條件等待的,這裏不進行展開。

若是shouldParkAfterFailedAcquire返回了true,則會執行:「parkAndCheckInterrupt()」方法,它是經過LockSupport.park(this)將當前線程掛起到WATING狀態,它須要等待一箇中斷、unpark方法來喚醒它,經過這樣一種FIFO的機制的等待,來實現了Lock的操做。

釋放鎖

經過ReentrantLock的unlock方法來看下AQS的鎖釋放過程。來看下源碼:

public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

unlock調用AQS的release()來完成, AQS的若是tryRelease方法由具體子類實現。tryRelease返回true,則會將head傳入到unparkSuccessor(Node)方法中並返回true,不然返回false。首先來看看Sync中tryRelease(int)方法實現,以下所示:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
    setExclusiveOwnerThread(null);
    }
    setState(c);
    return free
}

這個動做能夠認爲就是一個設置鎖狀態的操做,並且是將狀態減掉傳入的參數值(參數是1),若是結果狀態爲0,就將排它鎖的Owner設置爲null,以使得其它的線程有機會進行執行。
在排它鎖中,加鎖的時候狀態會增長1(固然能夠本身修改這個值),在解鎖的時候減掉1,同一個鎖,在能夠重入後,可能會被疊加爲二、三、4這些值,只有unlock()的次數與lock()的次數對應纔會將Owner線程設置爲空,並且也只有這種狀況下才會返回true。

在方法unparkSuccessor(Node)中,就意味着真正要釋放鎖了,它傳入的是head節點(head節點是佔用鎖的節點),看下源碼:

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

內部首先會發生的動做是獲取head節點的next節點,若是獲取到的節點不爲空,則直接經過:「LockSupport.unpark()」方法來釋放對應的被掛起的線程,這樣一來將會有一個節點喚醒後繼續進入循環進一步嘗試tryAcquire()方法來獲取鎖。

以上ReentrantLock的釋放鎖的過程就分析完畢了。結合對ReentrantLock的加解鎖的過程的分析,本文對AQS的內部結構及原理進行了深刻的分析,應看到Java經過一個AQS隊列解決了許多問題,這個是Java層面的隊列模型,其實咱們也能夠利用許多隊列模型來解決本身的問題,甚至於能夠改寫模型模型來知足本身的需求。

相關文章
相關標籤/搜索