啃透Java併發-AQS詳解

上一篇咱們解讀了LockSupport的源碼,其中有提到JUC的一塊重要基石是AbstractQueuedSynchronizer類,簡稱AQS,那麼這一篇就正式學習這個類。因爲我也是以學代練,確定有不少地方理解的不夠到位,歡迎你們留言討論哈!仍是友情提示,本文的分析的JDK版本是8。node

AQS的工做原理概述

爲何要在開篇就介紹AQS的工做原理呢?由於先對一些知識點有個大概瞭解,能夠幫咱們在看源碼時更容易理解一些,作到有的放矢,事半功倍。安全

這裏我總結了三個比較關鍵的點,須要咱們知道的。bash

  1. AQS內部有一個volatile變量state,而且提供了compareAndSetState方法,能夠線程安全的修改state的值,不一樣的需求場景下,state會有不一樣的意義,粗俗一點說,就是遊戲規則咱們本身根據需求來定義,只要你們都遵照這個規則,這個遊戲就可以玩起來。
  2. 當競爭鎖失敗後,其實能夠理解爲CAS更新state失敗,當前線程會被封裝進一個Node對象,而後放入一個Node雙向隊列中,而後調用LockSupport.park,讓線程等待。
  3. 當鎖被釋放後,AQS會檢查隊列中是否有線程在等待,若是有,unpark喚醒該線程,並從等待隊列中刪除對應的Node(將該node設爲頭結點)。

AQS的實現思路仍是很清晰的,使用一個state來維護競爭狀態,使用CAS來安全的更新state,獲取鎖失敗的線程進入等待隊列unpark,鎖被釋放後,從隊列中喚醒一個線程來繼續嘗試獲取鎖。併發

使用AQS自定義一個Lock類

AQS支持獨佔和共享二種模式,獨佔模式相對容易理解一些,光說不練假把式,咱們先利用AQS實現一個獨佔鎖SmartLock來加深理解。ide

public class SmartLock {

    private class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean tryAcquire(int arg) {
            if (getExclusiveOwnerThread() == Thread.currentThread()) return true;
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            } else {
                return false;
            }
        }

        @Override
        protected boolean tryRelease(int arg) {
            setState(0);
            setExclusiveOwnerThread(null);
            return true;
         }
    }

    private Sync mSync;

    public SmartLock() {
        mSync = new Sync();
    }

    public void lock() {
        mSync.acquire(1);
    }

    public void unLock() {
        mSync.release(1);
    }
}
複製代碼

咱們新建一個內部類Sync繼承AQS,重寫它的tryAcquire和tryRelease方法,能夠理解爲它們分別對應獨佔模式下的嘗試獲取鎖和嘗試釋放鎖,返回true表示成功,false表示失敗。oop

這裏咱們能夠停下來想一下,既然AQS內部有一個state能夠利用,那咱們能夠這樣設定遊戲規則,state=1時表示鎖被佔用,state=0表示鎖沒有被某個線程持有。學習

protected boolean tryAcquire(int arg) {
            // 先判斷當前持有鎖的線程是否是本線程,若是是,直接返回true,因此咱們這個鎖是支持可衝入的
            if (getExclusiveOwnerThread() == Thread.currentThread()) return true;
            // CAS的方式更新state,只有當state=0時會成功更新爲1
            if (compareAndSetState(0, 1)) {
                // 當前線程已經獲取了鎖,設置爲Owner thread
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            } else {
                // 返回true,當前線程會被加入等待隊列中
                return false;
            }
        }
        
        protected boolean tryRelease(int arg) {
            // 狀態更新爲0,
            setState(0);
            // Owner thread設置爲null
            setExclusiveOwnerThread(null);
            return true;
        }
複製代碼

咱們在SmartLock類中定義二個方法lock和unLock,分別調用acquire和release便可,這裏的參數沒有用到,傳1便可。測試

public void lock() {
        mSync.acquire(1);
    }

    public void unLock() {
        mSync.release(1);
    }
複製代碼

咱們用SmartLock來實現一個線程安全的累加器,邏輯很簡單就是提供一個increase方法,對counter進行++操做,咱們知道++操做不是原子的,因此咱們用SmartLock來保證原子性。ui

public class SmartAdder {
    private volatile int counter;
    private SmartLock lock;

    public SmartAdder() {
        lock = new SmartLock();
    }

    public void increase() {
        lock.lock();
        try {
            counter++;
        } finally {
            lock.unLock();
        }
    }

    public int getCounter() {
        return this.counter;
    }
}
複製代碼

咱們寫一段測試的case來驗證一下,新建了一個固定有20個核心線程的線程池,而後提交了40個累加任務,每一個任務循環100000次,這樣獲得的正確結果應該是4000000。this

public static void main(String[] args) {
        int threadCount = 20;
        int addCount = 100000;
        SmartAdder smartAdder = new SmartAdder();
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        for (int i = 0; i < threadCount * 2; i++) {
            executorService.submit(() -> {
                for (int j = 0; j < addCount; j++) {
                    smartAdder.increase();
                }
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("count:" + smartAdder.getCounter());
        executorService.shutdown();
    }
    
    // 打印結果
    count:4000000
複製代碼

打印就結果驗證了咱們的SmartLock確實可以正常工做,這樣一個簡單的互斥鎖就完成了,其實也並不複雜嘛!其中CountDonwLatch也是JUC提供的一個併發同步類,關於它的用法後面會詳解,這裏你們只須要知道await可讓當前線程等待線程池中的任務執行完成便可。

AQS源碼解讀

有了前面的鋪墊,咱們如今先看下AQS中獨佔模式的acquire和release二個方法的具體實現。

獨佔模式acquire方法源碼解讀

先看acquire方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
複製代碼

能夠看到acquire是一個final方法,咱們無法重寫它,可是有預留一個tryAcquire方法讓咱們重寫,咱們在上面的SmartLock類中也是重寫了tryAcquire該方法,若是tryAcquire返回false,會調用acquireQueued方法,它的參數是addWaiter(Node.EXCLUSIVE)的結果,咱們先來具體跟進看一下addWaiter的實現。

private Node addWaiter(Node mode) {
        // 新建一個Node
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        // 若是隊列的尾標兵tail不爲空,將新加入的node插入到隊尾,並更新tail
        if (pred != null) {
            node.prev = pred;
            // 若是CAS設置tail成功,直接返回
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 若是tail爲空,或者CAS設置tail失敗
        enq(node);
        return node;
    }
複製代碼

這裏的思路就是將新建的node插入到隊尾,可是因爲到考慮到線程安全的問題,採用了CAS更新,若是更新失敗,調用enq方法,繼續跟進看一下實現。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 若是檢查到隊列沒有初始化,先執行初始化,注意head對頭是標兵
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                // 在循環中執行CAS插入尾部操做
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
複製代碼

因此看下來,addWaiter邏輯也很清晰,就是要將當前線程,封裝爲node插入到隊列尾部。再看下acquireQueued的實現

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 返回node前驅
                final Node p = node.predecessor();
                // 若是前驅是head,說明當前隊列中該線程排在第一順位,再次調用tryAcquire
                // 由於後面調用的parkAndCheckInterrupt會讓線程等待,當鎖被release時,線程會被unpark
                // 因此從新tryAcquire來獲取鎖,若是獲取成功,會將當前node設爲頭結點,至關於將當前
                // node從隊列中刪除了,由於頭結點只是一個標兵,
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    // 這裏之因此能夠直接將.next置爲null,而沒有考慮node的next,由於是剛加入的node
                    // 它在隊尾,而又是head的next,說明隊列中就它一個,直接將head.next = null就能夠了
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 先對head設置waitStatus標示位,而後park線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
複製代碼

shouldParkAfterFailedAcquire這個方法頗有意思,它是將head的waitStatus設爲SINGLE,用來標識有任務須要被喚醒,在後面unpark的時候會檢查該標識位。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 獲取node的pre的waitStatus,
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // 若是已是Node.SIGNAL,能夠安全的park,直接返回
            return true;
        if (ws > 0) {
            // 說明pred被取消了,併發時會出現這種狀況,由於咱們沒有加鎖
            // 繼續向前查找waitStatus <= 0的node,
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 將pred的waitStatus設爲SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
複製代碼

再看下parkAndCheckInterrupt這個方法的實現

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
複製代碼

比較簡單,直接調用了LockSupport.park,因此AQS中讓線程等待的方式就是park,這也就是爲何咱們前一篇文章要分析LockSupport源碼的緣由。

那麼線程park等待了,那固然就要有喚醒,咱們看下AQS中release的實現。

獨佔模式release方法源碼解讀

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
複製代碼

一樣的AQS中release是一個final方法,不能被重寫,咱們能夠重寫tryRelease方法。當head不爲空,切waitStatus不爲0時,調用unparkSuccessor方法,跟進去看下實現

private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
           // 先將waitStatus設爲0
            compareAndSetWaitStatus(node, ws, 0);

        // 通常須要被喚醒的是node.next,可是若是next的node被取消了,或者waitStatus>0,這時候這裏的
        // 策略是從尾部開始從新選擇一個node來unpark
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // unpark喚醒線程
            LockSupport.unpark(s.thread);
    }
複製代碼

release的實現相對簡單一些,前面介紹tryAcquire失敗後,會將當前線程插入到等待隊列中時,而後將head的waitStatus置爲SINGAL,那麼在release時,會先檢查這個標識,而後unpark,這裏有個小細節,若是head.next被取消了或者waitStatus>0,會從隊列的尾部開始往前查找到第一個符合條件的node來unpark。

這裏有個細節你們要注意,release只是將隊列中第一個知足條件等待的線程喚醒,因此接下來的邏輯仍是在acquireQueued方法中,繼續嘗試調用tryAcquire,若是成功,則會被出隊列(當前節點設爲頭結點),線程繼續執行,不然繼續等待。

介紹完了獨佔模式,再來看下共享模式。與獨佔模式相似,AQS也對共享模式提供了模板方法。分別是acquireShared和releaseShared,它們也都是final的,咱們可以重寫的方法是tryAcquireShared和tryReleaseShared。

共享模式acquireShared解讀

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
複製代碼

acquireShared先調用了tryAcquireShared方法,若是返回值小於0, doAcquireShared一樣構建SHARED類型的Node加入等待隊列。這裏要提一下,tryAcquireShared方法使咱們須要重寫的,注意它的返回值是int類型的,而上面咱們分析獨佔模式tryAcquire的返回值是boolean,由於在共享模式下這個返回值須要有三種狀態,因此須要是int類型。

  • tryAcquireShared < 0,獲取共享鎖失敗
  • tryAcquireShared = 0,獲取成功,可是不須要喚醒隊列中後續的節點
  • tryAcquireShared > 0,獲取成功,須要喚醒隊列中後續的節點

好,咱們繼續看下doAcquireShared的實現

// 添加當前節點到隊列,與獨佔模式相似,再也不贅述
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 獲取node的前驅
                final Node p = node.predecessor();
                if (p == head) {
                    // 再次嘗試獲取共享鎖
                    int r = tryAcquireShared(arg);
                    // 若是獲取成功
                    if (r >= 0) {
                        // 檢查隊列中後續的節點,是否須要被喚醒
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // park讓線程等待,與獨佔模式相似,再也不贅述
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
        
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        // propagate > 0, 或者 當前頭結點或者當前節點node的waitStatus < 0時,調用doReleaseShared
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
複製代碼

大部分的邏輯跟獨佔模式相似,可是多了一個檢查後續節點是否須要被喚醒的邏輯。

共享模式releaseShared解讀

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
複製代碼

能夠看到doReleaseShared是在一個循環中,若是在調用中,head發生了變化,繼續循環,不然挑出循環,而在都獨佔模式下,沒有這樣的併發問題,因此獨佔模式下不須要循環,另外幹活的就是unparkSuccessor方法,它來喚醒等待的線程,上面在分析獨佔模式時已經分析過了,這裏再也不贅述。

總結

文章先是概述了一下AQS的基本實現原理,而後利用AQS實現了一個簡單的互斥鎖,最後詳細分析了AQS中獨佔和共享二種模式的關鍵方法的實現。以上。

相關文章
相關標籤/搜索