[源碼分析]ReentrantLock & AbstractQueuedSynchronizer & Condition

首先聲明一點: 我在分析源碼的時候, 把jdk源碼複製出來進行中文的註釋, 有時還進行編譯調試什麼的, 爲了不和jdk原生的類混淆, 我在類前面加了"My". 好比把ReentrantLock更名爲了MyReentrantLock, 在源碼分析的章節裏, 我基本不會對源碼進行修改, 因此請忽視這個"My"便可.java


一. 簡介

鎖是什麼? 鎖是一種標誌, 或者是一種資源, 持有鎖的線程才能夠繼續往下執行相應的內容. 未持有鎖的線程須要等待這個鎖資源. 直到獲取到了這個鎖, 才能夠繼續向下執行.node

0. ReentrantLock的一個小demo

想本身運行這段代碼的話, 把代碼中的"MyReentrantLock" 改成 "ReentrantLock" 便可. (後續的代碼也同樣, 若是想本身運行, 還編譯報錯, 請把我修改的代碼改回來. 也就是把"My"都去掉就行了)數據結構

public class Main {
    private static MyReentrantLock lock = new MyReentrantLock();

    public static void main(String[] args) throws Exception {
        // 場景以下: 線程1先得到鎖, 釋放後, 線程2 再得到鎖.

        new Thread(() -> {
            System.out.println("線程1啓動");
            lock.lock();
            System.out.println("線程1搶到鎖");
            try {
                System.out.println("這裏是業務邏輯1");
                quietSleep(2);// 兩秒後釋放鎖
                System.out.println("兩秒後");
            } finally {
                lock.unlock();
                System.out.println("線程1釋放鎖");
            }
        }).start();

        new Thread(() -> {
            System.out.println("線程2啓動");
            quietSleep(1); // 在這裏進行謙讓. 確保上面的線程能先運行. 也就是讓上面的線程先得到鎖
            lock.lock();
            System.out.println("線程2搶到鎖");
            try {
                System.out.println("這裏是業務邏輯2");
            } finally {
                lock.unlock();
                System.out.println("線程2釋放鎖");
            }
        }).start();

    }

    public static void quietSleep(long sec) {
        try {
            Thread.sleep(sec * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

 輸出的結果以下:併發

1. sync字段

首先來看一下ReentrantLock裏惟一的一個字段函數

Sync繼承自AQS(AbstractQueuedSynchronizer, 如下簡稱AQS) . 公平鎖和非公平鎖都繼承了Sync. Sync是ReentrantLock類裏鎖的統一聲明. 源碼分析

2. lock/unlock依賴Sync

ReentraintLock的 lock()和unlock()方法實際上都是靠Sync來實現的:ui

3. 鎖內部類定義

Sync 和 公平鎖 和 非公平鎖 都是ReentrantLock的內部類, 類的定義部分以下(細節先隱藏起來了, 後面會講):this

4. ReentrantLock構造器

ReentrantLock有兩個構造器..net

1. 默認構造器是直接使用了非公平鎖. 非公平鎖就是不必定按照"先來後到"的順序來進行爭搶.線程

2. 帶參構造器能夠傳遞一個bool類型. true的時候爲公平鎖. 公平鎖就是按照"先來後到"的順序來進行爭搶.

二. 公平鎖申請鎖

使用鎖的第一個步驟, 固然就是先申請鎖了, 咱麼來分析一下源碼, 看看申請鎖的流程吧. 

1. 公平鎖獲取鎖的流程(單線程, 沒有爭搶) 

首先從最外層的調用lock()方法開始. 我們在Main方法裏寫下這兩行代碼:

MyReentrantLock就是ReentrantLock, 我複製了源代碼, 而後改了個名字而已.

Reentraint類的lock()方法最終仍是調用的sync.lock()

因爲咱們如今使用的是公平鎖. 因此sync如今是FairSync. 因此sync.lockI()實際上就是FairSync類裏的lock()方法

發現lock()調用的是acquire(1)這個方法, 這個方法是在AQS類裏實現的.代碼以下:

arg當時傳進來的是1, 因此首先進行的是tryAcquire(1)來進行"嘗試獲取鎖"的操做. 這時一種樂觀的想法.

tryAcquire方法的具體實如今FairSync類裏, 具體代碼以下:

/**
     * @return 返回true: 獲取到鎖; 返回false: 未獲取到鎖
     * 何時返回true呢?  1.沒有線程在等待鎖;2.重入鎖,線程原本就持有鎖,也就能夠理所固然能夠直接獲取
     * @implNote 嘗試直接獲取鎖.
     */
    protected final boolean tryAcquire(int acquires) {
        // 獲取當前線程的引用
        final Thread current = Thread.currentThread();

        // 當前鎖的計數器. 用於計算鎖被獲取的次數.在重入鎖中表示鎖重入的次數.因爲這個鎖是第一次被獲取, 因此c==0
        int c = getState();

        // c==0, 也就是 state == 0 ,重入次數是0, 表示此時沒有線程持有鎖.
        if (c == 0) {
            // 公平鎖, 因此要講究先來後到
            // 由於有多是上一個持有鎖的線程剛剛釋放鎖, 隊列裏的線程還沒來得及爭搶, 本線程就亂入了
            // 因此每次公平鎖搶鎖以前, 都要判斷一下等待隊列裏是否有其餘線程
            if (!hasQueuedPredecessors() &&
                    // 執行到這裏說明等待隊列裏沒有其餘線程在等待.
                    // 若是沒有線程在等待,那就用CAS嘗試一下,成功了就獲取到鎖了,
                    // 不成功的話,只能說明一個問題,就在剛剛幾乎同一時刻有個線程搶先了 =_=
                    compareAndSetState(0, acquires)) {
                
                // 到這裏就獲取到鎖了,標記一下,告訴你們,如今是我(當前線程)佔用了鎖
                setExclusiveOwnerThread(current);
                // 成功獲取鎖了, 因此返回true
                return true;
            }


            //-- 因爲如今模擬的是單純地獲取一次鎖, 沒有重入和爭搶的狀況, 因此執行不到這裏, 上面的cas確定會成功, 而後返回true


        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

爭搶完鎖以後會返回true, 而後回到上層方法acquire : 

if語句裏 && 前面是false, 不會繼續往下執行了. 當前線程獲取到了鎖, 並且執行了全部該執行的內容, 就完事兒了.

2. 公平鎖進行重入的流程

重入就是一個線程獲取到了鎖, 而後這個線程又一次申請(進入)了這個鎖.

重入用synchronized來舉例就是這樣:

用ReentrantLock來舉例子就是這樣:

同一個線程(main線程) 首先進行了lock.lock()申請並佔有了鎖, 隨後又執行了一次lock.lock(). 還沒釋放鎖的狀況下, 又一次申請鎖. 這樣就是重入了.  

上面一小節已經分析了第一行的lock.lock()是如何獲取到鎖的, 因此咱們只分析 重入的部分, 也就是後面那句lock.lock()的執行流程.

前面的執行過程一直是如出一轍的, 直到這裏:

/**
     * @return 返回true: 獲取到鎖; 返回false: 未獲取到鎖
     * 何時返回true呢?  1.沒有線程在等待鎖;2.重入鎖,線程原本就持有鎖,也就能夠理所固然能夠直接獲取
     * @implNote 嘗試直接獲取鎖.
     */
    protected final boolean tryAcquire(int acquires) {
        // 獲取當前線程的引用
        final Thread current = Thread.currentThread();

        // 當前鎖的計數器. 因爲前面的那句lock已經獲取到鎖了, 因此這裏是status==1, 也就是 c==1
        int c = getState();

        // c==1, 表示當前有線程持有鎖, 因此這段if是進不去了
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }

        // 因爲 c==1 , 沒法進入if語句, 因此來看看滿不知足這裏的 else if
        // 這個鎖被人佔了, 但仍是不死心, 因而看一下是否是當前線程本身佔的這個鎖.
        // (人家女生說有喜歡的人, 爲何不問問是否是本身呢 = =.)
        // 因爲是同一個線程, 因此就是本身啦! 因此會進入這個else if分支,
        } else if (current == getExclusiveOwnerThread()) {
            // 代碼執行到這裏了, 就是所謂的 重入 了

            // 這裏的acquires的值是1, 因此nextc =  1 + 1 , 也就是2了
            int nextc = c + acquires;
            // 小於0, 說明int溢出了
            if (nextc < 0) throw new Error("Maximum lock count exceeded");
            // 在這裏把狀態更新一下, 把state更新爲2, 意思就是這個鎖被同一個線程得到2次了. 
            // (你們就能夠以此類推, 下次再重入的話, 那麼就會再+1, 就會變爲3....)
            setState(nextc);
            // 重入完成, 返回true
            return true;
        }
        
        return false;
    }

 還記得上小節講的, 獲取鎖的時候進入的是這段代碼的if語句, 而重入就不同了, 進入的是 else if語句. 但最終返回的仍是true, 表示成功. 

上面講的是無爭強的狀況, 接下來說講有爭搶的狀況.

3. 公平鎖cas爭搶失敗

場景以下:

一開始鎖是空閒狀態, 而後兩個線程同時爭搶這把鎖(在cas操做處發生了爭搶).

一個線程cas操做成功, 搶到了鎖; 另外一個線程cas失敗. 

代碼例子以下(代碼的意思到位了, 可是這段代碼最後不必定會在cas處進行爭搶, 你們意會就行了):

cas操做成功的線程就和上面第1小節的同樣, 就不用再重複描述了.

而cas爭搶失敗的線程會何去何從呢? 看我給你們分析: 

 /**
     * @return 返回true: 獲取到鎖; 返回false: 未獲取到鎖
     * 何時返回true呢?  1.沒有線程在等待鎖;2.重入鎖,線程原本就持有鎖,也就能夠理所固然能夠直接獲取
     * @implNote 嘗試直接獲取鎖.
     */
    protected final boolean tryAcquire(int acquires) {
        // 獲取當前線程的引用
        final Thread current = Thread.currentThread();

        // 當前鎖的計數器.
        int c = getState();

        // state == 0 表示此時沒有線程持有鎖
        if (c == 0) {
            // 本場景中, 一開始鎖是空閒的, 因此隊列裏沒有等待的線程
            if (!hasQueuedPredecessors() &&
                    // 兩個線程在這裏進行爭搶
                    // cas搶成功的會進入到if代碼塊
                    // cas搶失敗的, 就跳出整個if-else, 也就是直接到最後一行代碼
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }

        // cas 操做失敗後, 會這直接執行到這裏. 返回false.
        return false;
    }

 在這裏返回了false, 回到上一層函數.

第一個條件是true, 因此會繼續往下執行acquireQueued方法. 來準備讓這個失敗的線程進入隊列等待.

下面繼續來給你們講解 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) .

先講講這個addWaiter(Node.EXCLUSIVE):

/**
     * 將當前線程封裝爲Node, 而後根據所給的模式, 進行入隊操做
     *
     * @param mode 有兩種模式 Node.EXCLUSIVE 獨佔模式, Node.SHARED 共享模式
     * @return 返回新節點, 這個新節點封裝了當前線程.
     */
    private Node addWaiter(Node mode) { // 這個mode沒用上.
        Node node = new Node(Thread.currentThread(), mode);
        // 我們剛纔都沒見到過tail被賦予了其餘的值, 固然就是null了.
        Node pred = tail;
        // tail是null的話, pred就是null, 因此不會進入到這個if語句中.因此跳過這個if語句.
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }

        // 由於鎖的等待隊列是懶初始化, 直到有節點插入進來, 它才初始化.
        // 而如今這個掙錢失敗的線程, 正好是鎖創建以來, 第一個進入等待隊列的線程. 因此如今才準備進行初始化.
        // 初始化完了後會把當前線程的相關信息和引用封裝成Node節點, 而後插入到隊列當中.而且制定head 和 tail.
        // tail就不等於null了, 因此下一次addWaiter方法被調用的時候, 就會執行上面的if語句了. 而不會跳過if語句, 來到這裏進行初始化了.
        enq(node);
        // 返回這個Node節點.
        return node;
    }

 目的就是要將這個cas失敗的線程封裝成節點, 而後插入到隊尾中. (等待隊列是懶初始化,) 

若是隊列已經初始化了, 那麼tail就不會是null, 就會執行上面代碼中的if語句, 調整一下指針的引用就行了.

可是若是隊列還未初始化, 那麼就應該先初始化, 再插入. 先初始化,再插入, 對應的代碼是enq(node). 

接下來說解一下enq方法: 

   /**
     * 採用自旋的方式入隊
     * CAS設置tail,直到爭搶成功.
     */
    private Node enq(final Node node) {
        for (; ; ) {
            Node t = tail;
            //  最開始tail確定是null, 進入if進行初始化head和tail.
            if (t == null) { // Must initialize
                // 設置head 和tail. cas來防止併發.
                if (compareAndSetHead(new Node())) tail = head;
                
            // if 語句執行完了後, 以後的for循環就會走else了.
            } else {
                // 爭搶入隊, 沒搶到就繼續for循環迭代.搶成功了就能夠return了,否則一直循環.
                // 爲何是用cas來爭搶呢? 由於怕是多個線程一塊兒執行到這裏啊 
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

 剛纔的addWaiter(Node.EXCLUSIVE) 分析完了, 總之就是addWaiter以後, 隊列確定是被建立完了, 並且還把node(當前線程的封裝)插入到了隊列的隊尾. 而且返回了這個node.  acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 能夠簡化爲 acquireQueued(node)

因此繼續分析acquireQueued方法.

final boolean acquireQueued(final Node node, int arg) {
        // node是剛纔addWaiter方法插入到隊尾的節點
        // arg 是 1

        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                // 獲取node節點的前驅.
                final Node p = node.predecessor();

                // 若是node節點的前驅是head
                if (p == head
                        // 那麼能夠再嘗試着搶一下鎖. 
                        // 等待隊列裏的第一個節點很樂觀, 由於確實頗有可能會立刻輪到他
                        && tryAcquire(arg)) {
                    // 若是這個node就是那麼巧合, 剛剛鎖被釋放了, 這回從新搶就真的搶到了
                    // 那麼就把當前節點設爲頭結點.(頭結點的含義就是當前持有鎖的線程)
                    setHead(node);
                    // 上一個節點既然已經釋放了鎖, 也就該GC了. 置爲null, 方便GC收集
                    p.next = null; // help GC
                    // 很明顯是獲取鎖成功了啊, 因此failed = false
                    failed = false;

                    // 這麼大一段代碼, 只有這一處return
                    return interrupted;
                }

                //---- 若是不是隊頭,  那麼就會執行到這裏.
                //---- 或者雖然做爲等待隊列裏的第一名, 單因爲持有鎖的線程仍是沒有釋放, 因此仍是沒搶到鎖. 那麼也會執行到這裏

                // 獲取鎖失敗的時候是否該阻塞
                if (shouldParkAfterFailedAcquire(p, node)
                        // 在這裏阻塞, 等待喚醒
                        && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 上面那段, 若是中途異常了的話, 就會執行到這裏. (通常不會到這裏的)
            if (failed) cancelAcquire(node);
        }
    }

 上面這段代碼中shouldParkAfterFailedAcquire方法 和 parkAndCheckInterrupt() 方法 還未解釋. 一個一個來.

/**
     * 當前線程沒有搶到鎖,是否須要掛起當前線程
     *
     * @param pred 前驅結點
     * @param node 當前結點
     * @return 若是線程須要被阻塞, 那麼就返回true
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驅節點的 waitStatus == -1 ,說明前驅節點狀態正常,當前線程須要掛起,直接能夠返回true
        if (ws == Node.SIGNAL)
            return true;
        // 大於0, 其實就是等於1, Node.CANCELLED 是 1, 由於狀態中只有這個狀態是大於0的...說明前驅節點取消了排隊
        // 因此下面這塊代碼說的是, 在鏈表中從prev結點開始, 往前刪掉CANCELLED狀態的結點.
        // 只有CANCELLED狀態值大於0
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
                
            // 刪掉以後再往前看看, 看看前面是否是CANCELLED, 若是是, 那還得繼續往前刪
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 在前面的兩個if語句中排除掉了waitStatus值爲-1和1的狀況,
            // 只剩下0,-2,-3這三個狀態了
            // 然而在咱們前面的源碼中,都沒有看到有設置waitStatus的,
            // 因此只剩下等於0的狀況了
            // 下面的操做就是, 若是waitStatus等於0, 那麼就用cas將前驅結點的waitStatus設置爲-1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

shouldParkAfterFailedAcquire裏的前兩行是在判斷前驅節點prev的狀態. 可是以前我們分析代碼, 並無發現哪裏設置了waitState.

因此waitState是默認值0.

因此shouldParkAfterFailedAcquire會直接執行下面的else, 在這裏吧pred的waitState設置爲-1, 而後返回false.

回到剛纔的acquireQueued方法. 因爲外層是for循環, 會在下一次for循環在此執行到shouldParkAfterFailedAcquire方法. 

因爲剛纔已經把前驅節點prev的waitState改成1了, 因此此次在前兩行判斷prev的waitState時, 直接就知足條件, 而後return true了.

shouldParkAfterFailedAcquire方法return true了, 纔會往下執行parkAndCheckInterrupt方法.

下面是parkAndCheckInterrupt()方法. 最終返回Thread.interrupted(). 返回線程是否被中斷. (中斷和掛起不是一回事 )

    /**
     * 在這裏線程阻塞.
     * 被喚醒的時候會返回, 若是被中斷過, 那麼就返回true
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        // 掛起.
        MyLockSupport.park(this);
        return Thread.interrupted();
    }

 LockSupport.park(this)會掛起當前線程. 可是LockSupport.park還有一個隱藏功能. 就是, 若是先對一個線程unpark, 再對這個線程park, 那麼此次的park是失效的. 下一次park纔會掛起.

緣由就是, 對一個沒有被park的線程進行unpark的時候, 會把標誌位perm置爲1. 而每次park的操做, 都是先去檢查perm是否爲1.

若是是1, 那麼置爲0, 而且此次不掛起.

若是perm爲0, 那麼就直接掛起這個線程.

4. 公平鎖因爲隊列內有元素而失敗

demo以下. 前兩個線程, 其中一個獲取鎖成功, 另外一個失敗, 而後進入等待隊列.

稍後, 第三個線程來獲取鎖, 可是這時因爲等待隊列中已經有元素在等待了. 因此會直接失敗, 而後會被插入到等待隊列的尾部.

上面的main方法中總共有三個線程想要佔有鎖. 前兩個鎖的爭搶在上小節就已經模擬過了.

咱麼如今只分析第三個線程申請鎖的流程. 這個場景下的tryAcquire方法以下(會直接返回false):

/**
         * @return 返回true: 獲取到鎖; 返回false: 未獲取到鎖
         * 何時返回true呢?  1.沒有線程在等待鎖;2.重入鎖,線程原本就持有鎖,也就能夠理所固然能夠直接獲取
         * @implNote 嘗試直接獲取鎖.
         */
        protected final boolean tryAcquire ( int acquires){
            // 獲取當前線程的引用
            final Thread current = Thread.currentThread();

            // 當前鎖的計數器.
            int c = getState();

            // 不會走這的if語句, 由於鎖被其餘線程佔有, 確定不是0
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }

            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }

            // 因爲隊列內有元素, 因此if語句不執行, 
            // 因爲不是重入, else if 也不執行.
            // 直接返回false
            return false;
        }

這段方法返回false, 說明須要執行這個. acquireQueued(addWaiter(Node.EXCLUSIVE), arg). 先看看addWaiter方法有什麼區別.

/**
     * 將當前線程封裝爲Node, 而後根據所給的模式, 進行入隊操做
     *
     * @param mode 有兩種模式 Node.EXCLUSIVE 獨佔模式, Node.SHARED 共享模式
     * @return 返回新節點, 這個新節點封裝了當前線程.
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 如下幾行代碼想把當前node加到鏈表的最後面去,也就是進到阻塞隊列的最後
        Node pred = tail;
        // 若是tail不是空, 說明有頭結點.說明這個隊列已經被初始化了.
        // 由於本小節講的就是: 由於公平鎖的等待隊列中有其餘線程才致使當前線程爭鎖失敗, 因此說明等待隊列不只被初始化了, 並且裏面還有元素.
        if (pred != null) {
            // node設置本身的前驅爲pred
            node.prev = pred;
            // 用CAS把當前節點node設置爲隊尾, 若是成功後,tail指針就指向了node
            if (compareAndSetTail(pred, node)) {
                // 若是cas爭搶成功, 那麼就會在這裏返回.(而cas失敗的, 會跳過這個if代碼塊, 會執行到下面的enq方法)
                // 剩下的就是整理一下鏈表數據結構的鏈接問題了
                // pred調整本身的後繼爲node
                pred.next = node;
                return node;
            }
        }

        // 若是在上面的cas中設置失敗, 那麼仍是會執行到這裏.
        // 而後在enq方法裏靠for循環+cas的形式, 不斷嘗試着插入到隊尾.

        enq(node);
        return node;
    }

 後續執行的就和上小節的同樣了.就不重複了...

固然, 場景是舉不完的, 舉完的話就跟笛卡爾積那樣了. 我這裏只是靠這四個例子來儘可能完整地分析了獲取鎖的流程.

三. 公平鎖釋放鎖

剛纔申請鎖的流程. 可是爭搶失敗的那些線程, 最後都進入到了等待隊列裏, 而後就杳無音訊了.

那當前持有鎖的線程釋放鎖後, 是如何喚醒等待隊列裏的線程, 讓下一個線程獲取鎖的呢?

咱麼接下來分析一下釋放鎖的過程吧.

1. 申請1次鎖, 執行一些業務, 而後釋放

 

我們只關注unlock, lock就跳過了, 前面講過了.

ReentrantLock類的lock()方法 代碼以下: 

  

而這個release是AQS裏的方法. 源碼以下:

其中arg變量值是1.  首先會執行tryRelease(1) 來嘗試釋放鎖.

若是嘗試成功了, 那麼tryRelease(1)就會返回true, 就會繼續執行if代碼塊裏的內容. 

若是嘗試失敗了, 那麼tryRelease(1)就會返回false. 而後就會跳過if語句, 最終本段方法(release方法)也會返回false.

我們先分析一下tryRelease方法吧(tryRelease方法的源碼在Sync抽象類裏):

protected final boolean tryRelease(int releases) {
        // releases == 1

        // c 就是重入次數 -1 , 因爲本場景下模擬的是簡單的獲取一次鎖, 而後釋放, 不涉及到重入. 因此getState() == 1
        // 因此c = 1 - 1 , c如今等於0
        int c = getState() - releases;

        // 判斷當前的線程是否是持有鎖的線程, 否則拋異常.
        // 這是爲了其餘的線程搗亂.
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();

        // 用於標記是否能夠徹底釋放鎖
        boolean free = false;

        // c等於0, 說明沒有重入了, 能夠徹底釋放了.
        if (c == 0) {
            // 標記一下, 準備徹底釋放鎖
            free = true;
            // 把鎖的持有者設置爲空, 表示鎖被釋放.
            setExclusiveOwnerThread(null);
        }
        // 把剛纔c==0 設置爲state
        setState(c);

        // 表示是否徹底釋放. 本場景下返回true
        return free;
    }

 因爲返回的是true, 因此返回後還有if語句塊要執行:

接下來分析一下其中的unparkSuccessor方法, 看看他是如何喚醒下一個節點的.(這個方法在AQS裏)

unpark以後, 就會把以前park(掛起)的線程激活, 而後繼續執行:

若是線程被中斷了, 那麼parkAndCheckInterrupt()方法會返回true, 而後就會執行interrupted = true 這句話. 

掛起和中斷不是一回事, 通常不會被中斷的. 因此通常不會執行interrupted=true這句話. 

外層是個for循環, 當前線程被激活後, 做爲等待隊列中的第一個線程, 來進行獲取鎖. 因爲是公平鎖, 因此能夠放心拿到, 沒有人會搶, 因此會正常獲取到鎖.

2. 重入鎖的釋放

 

釋放重入的鎖(同一個線程屢次獲取的鎖), 執行流程惟一不一樣的就是tryRelease方法了, 其餘的都同樣, 能夠直接參考上面一小節的.

咱麼看看重入的時候, tryRelease是如何執行的吧.

 protected final boolean tryRelease(int releases) {
        // 其實就是重入計數器 -1
        // 而因爲本線程獲取了2次這個鎖, 因此state字段的值爲2
        // 因此c = 2 - 1
        // 因此如今c == 1
        int c = getState() - releases;

        // 判斷當前的線程是否是持有鎖的線程, 否則拋異常.
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();

        // 用來標記是否徹底釋放鎖
        boolean free = false;

        // c如今等於1, 不會進入這個if代碼塊
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }


        // 設置重入計數器, 也就是讓 state =1 
        setState(c);

        // 返回false.
        return free;
    }

 本小節和上小節的區別就是這段代碼了.既然這段方法返回false, 那麼返回後, release方法的if代碼塊天然也就不執行了. 

四. 非公平鎖的獲取

剛纔講解了公平鎖, 那麼接下來說講非公平鎖, 究竟是怎麼個不公平呢?

1. 非公平鎖與公平鎖獲取的區別

因爲非公平鎖的獲取與公平鎖的獲取, 只有一點點區別. 因此咱麼只分析出區別就行了, 其餘的部分都同樣的.

而後會調用到NonfairSync類裏的lock()方法.

這裏就體現出了區別.

公平鎖裏的lock()方法裏面, 只有acquire(1). 

而非公平鎖在acquire(1)以前多了一次cas操做. 一上來就嘗試着搶佔鎖, 看看有沒有機會(萬一真的這個時候持有鎖的線程正好把鎖釋放了呢). 非公平鎖根本無論是否有其餘人在排隊.上來就是一搶.

當此次cas失敗了, 纔會像公平鎖同樣進入acquire(1)方法:

 

這裏和公平鎖同樣. 只是, 非公平鎖的tryAcquire方法和公平鎖的tryAcquire方法內部實現不同.

看看非公平鎖的tryAcquire方法吧:

我們繼續往下看看nonfairTryAcquire方法吧:

/**
         * 不公平地嘗試獲取鎖.
         * 不公平的語義就是: 不用判斷隊列裏是否有其餘線程在等待, 直接搶.
         */
        final boolean nonfairTryAcquire(int acquires) {
            // 獲取當前線程的引用
            final Thread current = Thread.currentThread();
            // 當前線程的重入次數
            int c = getState();
            // 若是是0, 表示此時此刻鎖還被被任何一個線程所佔用
            if (c == 0) {
                // 當c==0的時候, 公平鎖鎖是先判斷隊列裏是否有其餘線程在等待, 若是沒有, 再去cas爭搶.
                // 而非公平鎖這裏, 就是根本就不去理會等待隊列, 本身抓到機會就趕忙搶
                // cas來爭搶, 讓重入次數變1.
                // 用cas是由於這個地方會發生併發.
                // 多個搶佔固然只有一個成功了
                if (compareAndSetState(0, acquires)) {
                    // 設置鎖的擁有者爲當前線程.
                    setExclusiveOwnerThread(current);
                    return true;
                }

                // 若是不是0, 說明鎖被某一個線程佔用了
                // 既然被佔用了, 那就有兩種狀況: 1. 被本身佔用; 2. 被別的線程佔用
                // 因此先看看是否是本身佔用的, 若是是本身佔用的, 那就重入.
            } else if (current == getExclusiveOwnerThread()) {
                // 其實就是+1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 這裏不會產生爭搶, 沒必要用cas
                // 由於只有佔用鎖的這一個線程才能進入到這個else if 裏
                // 一個線程不可能發生爭搶
                setState(nextc);
                return true;
            }
            // 1. 若是在if裏的cas爭搶失敗
            // 2. 或者是不知足else if的條件
            // 那就會直接返回false
            // 無論是成功仍是失敗, 都不會有線程的等待阻塞之類的. 都是當即返回.
            return false;
        }

 這裏的非公平鎖的nonfairTryAcquire方法 和 公平鎖的tryAcquire方法很像. 區別就是:

非公平鎖是, 當c==0. 也就是此時此刻, 鎖是空閒狀態的時候. 直接就嘗試着用cas來爭搶鎖, 看看是否能成功, 而無論等待隊列是否還有其餘線程再等待.

而公平鎖在c==0的時候, 也就是state==0 的時候, 先去看看隊列裏是否有其餘線程再等待, 若是隊列裏沒有其餘線程在等待, 纔會去cas爭搶. 否則就會把機會讓給隊列裏的第一個線程, 而本身會進入到等待隊列的尾部.

爲何c==0了, 隊列裏還有可能會有其餘的元素在等待呢?

由於c==0只是說明當前鎖的狀態是空閒狀態. 只是上一個線程剛剛把鎖釋放, 當前線程就來爭搶鎖了, 還沒來得及喚醒等待隊列裏的第一個線程呢.  

其餘地方就跟公平鎖都同樣的, 就是多了本小節講的兩處cas. 

五. 非公平鎖的釋放

1. 非公平鎖會致使飢餓

也就是說, 上一個線程釋放鎖後, `等待隊列` 裏的第一個線程就會被激活, 而後會執行tryAcquire方法. 若是這個時候有新的線程來爭搶, 

因爲是非公平模式, 有可能新的線程會搶到這個鎖. 若是新的線程搶到了鎖, 那麼剛剛被激活的線程(等待隊列裏的第一個線程)就是執行tryAcquire失敗, 這個方法執行失敗就意味着會被再次被掛起. 若是併發量嚴重, 極可能`等待隊列`裏的全部線程在必定時間內都沒法被正常調度.也就是產生了線程飢餓的現象.

六. Condition簡介

1. condition簡介和demo

public class Main {
    private static MyReentrantLock lock = new MyReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) {
        new Thread(Main::funcA).start();
        new Thread(Main::funcB).start();
    }

    public static void funcA() {
        lock.lock();

        System.out.println("await以前");
        try {
            condition.await(); // 在這裏等待被其餘線程通知(signal)
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("await以後");

        lock.unlock();
    }

    public static void funcB() {
        lock.lock();

        System.out.println("signal以前");
        condition.signal(); // 這裏會通知funcA的await().讓funcA()繼續執行下去
        System.out.println("signal以後");

        lock.unlock();
    }
}

 運行這段代碼, 輸出以下: 

若是仍是沒有體會到區別, 那麼把main方法裏的第二行註釋掉, 而後再執行一下:

輸出結果以下: 

也就是說, await()會使當前線程掛起, 須要其餘線程通知他, 他才能被激活(喚醒).

七. Condition實例化

1. 獲取condition的例子

2. condition實例化的源碼

newCondition方法在ReentrantLock類裏的實現以下:

Sync類裏的newCondition()方法以下:

ConditionObject是AQS裏的一個內部類,實現自Condition,  類的聲明以下(具體源碼後面再解釋):

八. condition的等待(await) 和 通知(signal)

1. 只執行一句await()後的流程

 

await()方法的具體實如今AQS裏的內部類ConditionObject類裏:

public final void await() throws InterruptedException {
        if (Thread.interrupted()) throw new InterruptedException();
        // 添加到 condition 的`條件隊列`中
        Node node = addConditionWaiter();
        // 徹底釋放鎖,返回值是釋放鎖以前的 state 值
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 這裏的isOnSyncQueue就是在判斷node節點是否在鎖的`等待隊列`裏
        while (!isOnSyncQueue(node)) {
            // 在這裏線程掛起
            MyLockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //---- 程序不會執行到下面, 由於在前面就已經掛起了.


        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

 這裏面有幾個方法以前沒提到過, 在這裏一一攻破.

先解決addConditionWaiter()方法:

   /**
     * 將當前線程對應的節點入隊,插入隊尾, 而且做爲本方法的返回值.
     */
    private Node addConditionWaiter() {
        // 本例子中的場景下, 只執行過一次await()方法, 因此是第一個進入`條件隊列`的元素.
        // 因此lastWaiter和firstWaiter確定都是null.
        Node t = lastWaiter;
        // 本例子中t==null, 因此這段if暫時不考慮吧
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        
        // 新建節點
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        
        // 由於t==null, 意思是隊列目前仍是空的, 因此這個節點是第一個節點, 因此是firstWaiter.
        if (t == null) firstWaiter = node;
        else t.nextWaiter = node;
        
        // 但node同時也是最後一個節點, 也就是lastWaiter
        lastWaiter = node;
        
        // 最後會返回本方法
        return node;
    }

 接下來是fullRelease(node)方法, 來徹底釋放鎖:

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            // 這裏使用了當前的 state 做爲 release 的參數,也就是徹底釋放掉鎖,將 state 置爲 0
            if (release(savedState)) {
                failed = false;
                // 而且把釋放鎖以前的state值返回出去. (本例子中是1)
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

 最後就是isOnSyncQueue(node)方法, 來判斷鎖的`等待隊列`中有沒有當前這個node:

   /**
     * 這個方法就是判斷 node 是否已經移動到sync queue了
     * (signal 的時候會將節點從條件隊列移到sync queue)
     */
    final boolean isOnSyncQueue(Node node) {
        // 當進入Condition隊列時,waitStatus確定爲CONDITION,
        // 若是同時別的線程調用signal,Node會從Condition隊列中移除,
        // 而且移除時會清除CONDITION狀態。
        // 從移除到進入sync queue隊列,中間這段時間prev必然爲null,因此仍是返回false,即被park
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            // 本例子中, 會在這裏返回
            return false;

        
        //--- 本例子中, 程序不會往下執行了. 可是下面的代碼仍是分析一下吧. 這樣待會兒就不用再從新講個方法了.
        
        
        // 當別的線程進入sync queue隊列時,會和前一個Node創建先後關係,因此若是next存在,說明必定在release隊列中
        if (node.next != null) // If has successor, it must be on queue
            return true;

        // 到這裏還沒找到, 那隻能去鎖的`等待隊列`裏一個一個找了

        // 可能該Node剛剛最後一個進入release隊列,因此是tail,其next必然是null,因此須要從隊尾向前查找
        // 這個方法的源碼就不講了, 太簡單了, 就是鏈表從後往前找node.找到了就true.沒找到就false.
        return findNodeFromTail(node);
    }

 最終會執行到await()方法裏的park()方法, 線程掛起. 等待被別的線程喚醒.

2. 只執行一句signal()後的流程

而後我們看看signal()的源碼.

因爲firstWaiter==null, 因此first==null, signal方法直接就退出了.

3.一個線程await等待, 另外一個線程用signal來喚醒

本場景的程序demo以下: 

public class Main {
    private static Scanner scanner = new Scanner(System.in);

    private static MyReentrantLock lock = new MyReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) throws Exception {
        new Thread(Main::funcA).start();
        new Thread(Main::funcB).start();
    }

    public static void funcA() {
        lock.lock();

        System.out.println("await以前");
        try {
            condition.await(); // 在這裏等待被其餘線程通知(signal)
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("await以後");

        lock.unlock();
    }

    public static void funcB() {
        lock.lock();

        System.out.println("signal以前");

        System.out.print("請輸入任意內容並回車, 以執行signal方法: ");
        scanner.next(); // 在這裏進行阻塞, 在控制檯輸入任意內容後回車, 就會接觸阻塞, 就會執行signal方法, 也就是通知funcA()方法.
        condition.signal(); // 這裏會通知funcA的await().讓funcA()繼續執行下去
        System.out.println("signal以後");

        lock.unlock();
    }

}

 首先, await()仍然執行到park這句, 而後掛起, 這點與本章第1小節的流程是同樣的(看下圖, 我選中的park那行代碼, await就在這裏掛起): 

 

而此時控制檯以下:

此時尚未執行signal, 由於我用輸入流給signal方法進行阻塞了, 須要輸入內容後回車, 就能夠調用到signal方法.signal通知後,await就會被喚醒.

以下:

我們分析一下signal是如何通知await, 而後讓await線程被喚醒的:

由於剛纔執行過await(), 因此firstWaiter不會是null. 因此會調用到doSignal方法:

 

上面這段代碼也比較簡單, 就是將firstWaiter爲頭的這個鏈表, 把第一個元素出隊, 而後讓第二個元素當新的頭部. 而後讓剛纔出隊的那個元素執行tansferForSignal方法.

    /**
     * 將節點從條件隊列轉移到鎖的`等待隊列`
     *
     * true 表明成功轉移
     * false 表明在 signal 以前,節點已經取消了
     */
    final boolean transferForSignal(Node node) {
        /*
         * 在這裏將 waitStatus 置爲 0.
         * 若是成功設置爲0, 那麼繼續往下面執行
         * 若是CAS 失敗,說明此 node 的 waitStatus 已不是 Node.CONDITION,說明節點已經取消,那麼直接return false.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * enq(node): 自旋進入阻塞隊列的隊尾.這個在將lock()方法的時候你們見到過.就是同一個方法.
         * 這裏的返回值 p 是 node 在阻塞隊列的前驅節點
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        // ws > 0 說明 node 在阻塞隊列中的前驅節點取消了等待鎖,直接喚醒 node 對應的線程。
        // 若是 ws <= 0, 那麼 compareAndSetWaitStatus 將會被調用
        // 由於節點入隊後,須要把前驅節點的狀態設爲 Node.SIGNAL(-1)
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 若是前驅節點取消或者 CAS 失敗,會進到這裏喚醒線程(可是本場景下不會執行到這裏)
            MyLockSupport.unpark(node.thread);
        // 返回true
        return true;
    }

 將上面這段代碼總結一下就是: 將本節點的waitState設置爲了0. 而後讓本節點插入到到鎖的`等待隊列`, 而後將前驅節點的waitState設置爲了1. 而後返回了true.  

這一行的tansferForSignal返回了true, 取反了就是false了, 因此退出了 while循環. 至此signal方法就執行完畢了.

signal乾的主要事情就是: 把`條件隊列`裏的第一個元素轉移(尾插)到了鎖的`等待隊列`裏. 

`條件隊列`就是firstWaiter爲頭結點的一個鏈表.

`等待隊列`就是我們上面將lock() unlock()的時候提到的鎖的等待隊列.

signal方法執行完了後, 接下來就該執行unlock()方法了. 以下圖:

unlock()所作的事情就是, 釋放當前的鎖, 而後激活`等待隊列`裏的第一個線程. 

而在本場景下, 如今等待隊列裏有且僅有一個元素, 就是signal方法轉移的那個元素.

unlock()以前分析過, unlock會調用release方法:

release方法所作的就是釋放鎖(第一個紅色代碼), 而後喚醒`等待隊列`裏的第一個線程(第二個紅色代碼).

unlock()方法執行完了後, 剛纔await掛起的那個線程就又被激活了.

因此接下來執行的是acquireQueued方法, 這個方法在將鎖的時候講過, 因此這裏簡單講解一下:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                // 本場景下: node是隊列裏的第一個元素, 也就是await的線程對應的node.
                // 本場景下: p是node的前一個節點, 也就是head節點了
                final Node p = node.predecessor();
                // 本場景下: p==head. 鎖如今空閒, tryAcquire也會成功.
                if (p == head && tryAcquire(arg)) {
                    // 將node設置爲新的head. head節點隱含的意思就是: head節點對應的線程是當前鎖的持有者
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;

                    // 返回false. 由於本場景下該線程沒有被中斷過.
                    return interrupted;
                }
                
                //--- 本場景下, 不會執行到下面的代碼
                
                if (shouldParkAfterFailedAcquire(p, node)
                        && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed) cancelAcquire(node);
        }
    }

 最終這個方法返回了true. 接下來, await()方法繼續執行剩下的幾行代碼就能夠退出了:

這兩行就是作了相應的維護操做, 和線程中斷判斷, 這裏就不講解了. 

隨後,await方法執行完了, 退出方法棧.

而後就繼續往下執行.  執行System.out.println, 而後是unlock.

至此本段程序就執行完了. 

相關文章
相關標籤/搜索