AQS源碼泛讀,梳理設計流程(jdk8)

一。AQS介紹

  AQS(AbstractQueuedSynchronizer)抽象隊列同步器,屬於多線程編程的基本工具;JDK對其定義得很詳細,並提供了多種經常使用的工具類(重入鎖,讀寫鎖,信號量,CyclicBarrier,CountDownLatch),在閱讀源碼的時候,我是從具體工具類往上讀的,這樣會比較便於理解AQS的設計。node

  下面,我將從五種經常使用類去分析源碼,進而學習AQS。編程

  論文地址安全

 

二。開始吧,重入鎖(ReetrantLock)

  

  咱們要閱讀的重入鎖,它首先遵循Lock的規範,而且實現了序列化接口;而Lock的規範,必然定義瞭如何鎖的,如何解鎖的,而且規定了newCondition這個方法。而重入鎖中,真正使用AQS的是他裏面內涵的一個實現類Sync,它繼承自AQS,並具備AQS的全部規範。多線程

  這個內涵的Sync,在重入鎖中實現了兩種類型的隊列,一個是公平隊列,另外一個是非公平隊列,這取決於你構造重入鎖的時候傳入的是哪個,默認是非公平鎖;less

  咱們在進入lock這個方法的時候,看到它真正調用的是acquire方法,而acquire方法,是AQS的一個標準定義;咱們先進入公平鎖的閱讀;工具

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

  咱們往上找,就找到了AQS裏的acquire方法;這個方法寫的很是乾淨,首先申請一個arg數量的權限,若是申請不成功,則進入等待隊列;這個tryAcquire方法,是在子類實現的;這裏插入一下,AQS裏存在一個state字段,它表示可一個許可,而重入鎖中它初始化爲0;而後咱們找到公平鎖的實現方法。oop

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

  在重入鎖中,咱們要能獲取鎖,實際上是state是否等於0;性能

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

  這個方法裏面有幾種狀況會返回false,(h != t)表示等待隊列非空,爲空就會返回false;另外一種狀況就是,隊列非空,當前隊列不等於後繼結點的隊列,會返回true;由於是公平隊列,你要申請權限必然是沒人排隊,即便有人排隊,也得是你最前面才能申請;ok,下一個條件就是CAS這個state值,成功就將獨佔狀態設爲當前線程;這個else if,就是重入鎖重入的關鍵了,若是當前線程和獨佔線程是一個,那就將權限再加acquires,固然這個state會超過上限並拋出overflow相似的異常。學習

  若是申請不成功,固然要排隊了,排隊都是雙端隊列的CRUD;ui

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    } 

   

  CAS初始化head,並頭尾指向一個地方;而後注意到外面是否是有一個for,又是自旋CAS的操做;在第二次循環的時候,會將node,整在後面。

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  這個方法,就是獲得的剛纔入隊的Node,而且又來一個自旋;而後看本身是否是頭結點,若是不是,則進入等待隊列,使用LockSupport來使當期線程休眠;這樣就構成了申請鎖並排隊的過程;

  接着咱們去看unlock方法,它指向的是AQS的release接口,與acquire相反,它是將state作減法;

  而這個方法,只有是獨佔線程調用才能夠,由於全部lock的非獨佔線程,全都會被park;

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

  因此在這裏並不須要CAS,它的安全性在於不是獨佔線程就會拋出非法異常;若是它釋放成功,就會喚醒後繼結點;後繼結點是head的後一個,這時候後繼結點被阻塞在lock代碼行能夠往下走了。這樣就造成了一個線程同步的重入鎖(公平)。咱們能夠看到AQS的設計很精湛,不少方法,都是重寫定製的,它值作了一些規範的定義。

  抽象隊列同步器,它是基於一個隊列作線程排隊的設計,那麼這個隊列的基本元素Node,咱們看一下。

 

   定義了一系列的狀態,攜帶每一次申請的線程thread,等等,很是直觀,註釋裏還給你畫出來了。

  看完公平鎖,咱們瞭解到它每次申請都要日後排隊,可想而知費公平鎖,就是不排隊?仍是要排隊的。

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

  咱們垂手可得找到上面的方法,它走的流程多了一個,就是若是許但是能夠申請的,則不須要排隊,直接申請,若是申請失敗,則走入公平鎖的流程。這就是說全部來申請鎖的線程,都有一次競爭的機會,若是沒有競爭上,仍是排隊。而release接口並未區別實現,因此每次unpark線程,仍是按照隊列順序。

  重入鎖就這麼簡單的讀完了。接着,公平鎖和非公平鎖的性能區別在哪呢?在於重複的park和unpark線程,對於非公平鎖,線程被park的概率會小一點,由於它不是必然排隊;而公平鎖必然是排隊的,它們的排隊機制是同樣的,而非公平鎖park線程的概率更小,則性能優於公平鎖。

 

三。什麼鬼,讀寫鎖(ReetrantReadWriteLock)

  看個圖,就不須要讀了。一把讀鎖,一把寫鎖,有興趣能夠本身研讀。

 

四。接着來,信號量(Semaphore)

   信號量是與重入鎖徹底不一樣類型的鎖,由於他是共享的(搞這麼複雜,不就是state初始化大於0)。它的做用用過的都有印象,就是多個線程能夠共享這一把鎖,而線程大於初始化憑證以後,就會被阻塞。

  

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

  我相信能夠很簡單地找到這個FairSync,在semaphore中。它先判斷是否只有你來申請,若是不是就回去操做state,若是申請小於0,直接返回,排隊。

   

    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  與重入鎖不一樣的是,入隊的時候Node指定爲SHARE模式,而且再次嘗試獲取鎖,若是獲取的鎖是大於等於0的,將會調用setHeadAndPropagate方法傳播釋放。若是是大於0的,則會調用release接口,下一個喚醒的線程又會重複上述過程,一直喚醒到==0。和重複鎖不同的是,它具備傳染性。

    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

  只要調用了這個方法,就會進行上述所說的傳染;正經的,這個方法就是激活線程並設置PROPAGATE,表示一直日後傳播激活。

 

  共享鎖和獨佔鎖的區別在於,解鎖線程會不會傳染...

  這樣,咱們已經讀了aqs裏面的兩種鎖了。

 

五。昇華,倒計時器(CountDownLatch)

   在看完上面的基本元素以後,搞一個倒計時器是什麼鬼,不就是搞個變量而後一直減,而後等於0的時候一古腦兒釋放???

   

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }        

  很容易找到上述代碼,咱們調用倒計時器時候,就會調用countDown方法,每次都會給初始化減一。這時候主線程或者等待線程,調用await在等待。直到它減到0,就會作doReleaseShared,這個時候等待隊列只有一個,就是父級線程,它就能夠往下走了。由於這貨減到0以後不會reset,因此不能複用。。。。

 

六。再看一個不看了,柵欄(CyclicBarrier)

  我曹?這貨可貴一筆,停更。

相關文章
相關標籤/搜索