關於 ReentrantLock 中鎖 lock() 和解鎖 unlock() 的底層原理淺析

關於 ReentrantLock 中鎖 lock() 和解鎖 unlock() 的底層原理淺析node

以下代碼,當咱們在使用 ReentrantLock 進行加鎖和解鎖時,底層究竟是如何幫助咱們進行控制的啦?多線程

    static Lock lock = new ReentrantLock();

    public static void main(String[] args) {

        // 使用兩個線程模擬多線程執行併發
        new Thread(() -> doBusiness(), "Thread-1").start();
        new Thread(() -> doBusiness(), "Thread-2").start();
    }

    private static void doBusiness() {
        try {
            lock.lock();
            System.out.println("須要加鎖的業務處理代碼,防止併發異常");
        } finally {
            lock.unlock();
        }
    }

帶着這樣的疑問,咱們前後跟進 lock()和unlock() 源碼一探究竟併發

說明:app

  一、在進行查看 ReentrantLock 進行 lock() 加鎖和 unlock() 解鎖源碼時,須要知道 LockSupport 類、瞭解自旋鎖以及鏈表相關知識。ui

  二、在分析過程當中,假設第一個線程獲取到鎖的時候執行代碼須要很長時間才釋放鎖,及在第二個第三個線程來獲取鎖的時候,第一個線程並無執行完成,沒有釋放鎖資源。this

  三、在分析過程當中,咱們假設第一個線程就是最早進來獲取鎖的線程,那麼第二個第三個線程也是依次進入的,不會存在第三個線程先於第二個線程(即第三個線程若是先於第二個線程發生,那麼第三個線程就是咱們下面描述的第二個線程)spa

 

1、 lock() 方法線程

一、查看lock()方法源碼翻譯

    public void lock() {
        sync.lock();
    }

  從上面能夠看出 ReentrantLock 的 lock() 方法調用的是 sync 這個對象的 lock() 方法,而 Sync 就是一個實現了抽象類AQS(AbstractQueuedSynchronizer) 抽象隊列同步器的一個子類,繼續跟進代碼(說明:ReentrantLock 分爲公平鎖和非公平鎖,若是無參構造器建立鎖默認是非公平鎖,咱們按照非公平鎖的代碼來說解)code

  1.1 關於Sync子類的源碼

abstract static class Sync extends AbstractQueuedSynchronizer { 
    // 此處省略具體實現AbstractQueuedSynchronizer 類的多個方法
}

  這裏須要說明的是 AbstractQueuedSynchronizer 抽象隊列同步器底層是一個經過Node實現的雙向鏈表,該抽象同步器有三個屬性 head 頭節點tail 尾節點 和 state 狀態值。

    屬性1:head——註釋英文翻譯:等待隊列的頭部,懶加載,用於初始化,當調用 setHead() 方法的時候會對 head 進行修改。注:若是 head 節點存在,則 head 節點的 waitStatus 狀態值用於保證其不變成 CANCELLED(取消,值爲1) 狀態

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    屬性2: tail——tail節點是等待隊列的尾部,懶加載,在調用 enq() 方法添加一個新的 node 到等待隊列的時候會修改 tail 節點。

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    屬性3:state——用於同步的狀態碼。若是 state 該值爲0,則表示沒有其餘線程獲取到鎖,若是該值大於1則表示已經被某線程獲取到了鎖,該值能夠是二、三、4,用該值來處理重入鎖(遞歸鎖)的邏輯。

    /**
     * The synchronization state.
     */
    private volatile int state;

  1.2 上面 Sync 類使用 Node來做爲雙向隊列的具體保存值和狀態的載體,Node 的具體結構以下

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node(); // 共享鎖模式(主要用於讀寫鎖中的讀鎖)
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null; // 排他鎖模式(也叫互斥鎖)

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1; // Node線程等待取消,再也不參與鎖競爭,處於這種狀態的Node會被踢出隊列,被GC回收
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1; // 代表Node線程須要被喚醒,能夠競爭鎖
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2; // 表示這個Node線程在條件隊列中,由於等待某個條件而被阻塞 
        /** waitStatus value to indicate the next acquireShared should unconditionally propagate */
        static final int PROPAGATE = -3; // 使用在共享模式頭Node有可能處於這種狀態, 表示鎖的下一次獲取能夠無條件傳播

        volatile int waitStatus; // 默認初始狀態爲0,全部新增node節點的初始狀態都是0

        volatile Node prev; // 前驅Node節點

        volatile Node next; // 後繼Node節點

        /** The thread that enqueued this node.  Initialized on construction and nulled out after use.*/
        volatile Thread thread; // 當前線程和節點進行綁定,經過構造器初始化Thread,在使用的時候將當前線程替換原有的null值

        // 省略部分代碼

  說明:Sync 經過Node節點構建隊列,Node節點使用prev和next節點來行程雙向隊列,使用prev來關聯上一個節點,使用next來關聯下一個節點,每個node節點和一個thread線程進行綁定,用來表示當前線程在阻塞隊列中的具體位置和狀態 waitStatus

 

二、上面的 sync.lock() 繼續跟進源碼(非公平鎖):

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else acquire(1);
    }

說明:上面代碼說明,若是 compareAndSetState(0, 1) 爲 true ,則執行 setExclusiveOwnerThread(Thread.currentThread()) ,不然執行 acquire(1);

  2.1 compareAndSetState(0, 1) 底層使用unsafe類完成CAS操做,意思就是判斷當前state狀態是否爲0,若是爲零則將該值修改成1,並返回true;state不爲0,則沒法將該值修改成1,返回false。

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

  2.2 假如第1個線程進來的時候 compareAndSetState(0, 1) 確定執行成功,state 狀態會從0變成1,同時返回true,執行 setExclusiveOwnerThread(Thread.currentThread()) 方法:

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

  setExclusiveOwnerThread(Thread.currentThread()) 表示將當前 Sync 對象和當前線程綁定,意思是代表:當前對內同步器執行的線程爲 thread,該 thread 獲取了鎖正在執行。

  2.3 假如進來的線程爲第2個,而且第一個線程還在執行沒有釋放鎖,那麼第2個線程就會執行 acquire(1)方法:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

  進入到該方法中發現,須要經過 !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 兩個方法判斷是否須要執行 selfInterrupt();

    (1)先執行tryAcquire(arg)這個方法進行判斷

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
       // 獲取state狀態,由於第一個線程進來的時候只要尚未執行完就已經將state設置爲1了(即:2.1步)
int c = getState();
       // 再次判斷以前獲取鎖的線程是否已經釋放鎖了
if (c == 0) {
// 若是以前的線程已經釋放鎖,那麼當前線程進來就將狀態改成1,而且設置當前佔用鎖的線程爲自身
if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
       // 判斷當前佔用鎖的線程是否是就是我自身,若是是我自身,這將State在原值的基礎上進行加1,來處理重入鎖邏輯
else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires;
          // 判斷重入鎖次數是否是超過限制,超過限制則直接報錯
if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

    從上面的方法看,若是第二個線程進來,且第一個線程還未釋放鎖的狀況下,該方法 tryAcquire(arg) 直接放回false,那麼 !tryAcquire(arg) 就爲true,須要判斷第二個方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),第二個方法先執行addWaiter(Node.EXCLUSIVE),及添加等待線程進入隊列

    (2)添加等待線程到同步阻塞隊列中

    private Node addWaiter(Node mode) {
     // 將當前線程和node節點進行綁定,設置模式爲排他鎖模式 Node node
= new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;// 第二個線程也就是第一次進來該方法的線程,tail確定是null if (pred != null) { // 若是tail尾節點不爲空,表示第三、四、5次進來的線程 node.prev = pred; // 那麼就將當前進來的線程節點的 prev 節點指向以前的尾節點 if (compareAndSetTail(pred, node)) { // 經過比較並交換,若是當前尾節點在設置過程當中沒有被其餘線程搶先操做,那麼就將當前節點設置爲tail尾節點 pred.next = node; // 將之前尾節點的下一個節點指向當前節點(新的尾節點) return node; } } enq(node); // 若是爲第二個線程進來,就是上面的 pred != null 成立沒有執行,直接執行enq()方法 return node; }
    private Node enq(final Node node) {
        for (;;) { // 一直循環檢查,至關於自旋鎖
            Node t = tail;
            if (t == null) { // Must initialize
                   // 第二個線程的第一次進來確定先循環進入該方法,這時設置頭結點,該頭結點通常被稱爲哨兵節點,而且頭和尾都指向該節點
if (compareAndSetHead(new Node())) tail = head; } else {
          // 一、第二個線程在第二次循環時將進入else 方法中,將該節點掛在哨兵節點(頭結點)後,而且尾節點指向該節點,而且將該節點返回(該節點有prev信息) node.prev
= t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

    如上在執行 enq(final Node node) 結束,而且返回添加了第二個線程node節點的時候, addWaiter(Node mode) 方法會繼續向上返回

    或者

    若是是添加第三、4個線程直接走 addWaiter(Node mode) 方法中的 if 流程直接添加返回

    都將到2.3 步,執行acquireQueued(final Node node, int arg),再次貼源碼

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    (3)即下一步就會執行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法:

    注:上面的流程是將後面的線程加入到了同步阻塞隊列中,下面的方法第一個if (p == head && tryAcquire(arg))則是看同步阻塞隊列的第一條阻塞線程是否能夠獲取到鎖,若是可以獲取到鎖就修改相應鏈表結構,第二個if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()即將發生線程阻塞

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
          // 自旋鎖,若是爲第二個線程,那麼 p 就是 head 哨兵節點
final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {
            // 上面的 if 代表若是當前線程爲同步阻塞隊列中的第一個線程,那麼就再次試圖獲取鎖 tryAcquire(),若是獲取成功,則修改同步阻塞隊列 setHead(node); // 將head頭結點(哨兵節點)設置爲已經獲取鎖的線程node,並將該node的Theread 設置爲空 p.next
= null; // help GC 取消和以前哨兵節點的關聯,便於垃圾回收器對以前數據的回收 failed = false; return interrupted; }
          // 若是第二個線程沒有獲取到鎖(同步阻塞隊列中的第一個線程),那麼就須要執行下面兩個方法,註標的方法會讓當前未獲取到鎖的線程阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
    private void setHead(Node node) {
        // 將哨兵節點日後移,而且將 thread 設置爲空,取消和之前哨兵節點的關聯,並於垃圾回收器回收
        head = node;
        node.thread = null;
        node.prev = null;
    }

    shouldParkAfterFailedAcquire(p, node)這個方法將哨兵隊列的狀態設置爲待喚醒狀態

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     // 該方法第一次進來 pred爲哨兵節點,ws爲哨兵節點的初始0狀態
     // 該方法第二次進來 pred爲哨兵節點,ws爲哨兵節點的狀態-1狀態
     // 該方法第3、四次進來就爲鏈表的倒數第二個節點,ws爲倒數第二個節點的狀態
int ws = pred.waitStatus;
    
if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */
       // 如上第二次進來的時候哨兵節點的狀態就是-1,此時返回true
return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do {
          // 若是ws及倒數第二個節點是取消狀態,那麼經過雙向鏈表向前找倒數第三個,第四關節點,直到向前找到最近一個狀態不是取消的node節點,並把當前節點掛在該節點後 node.prev
= pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 將頭結點(哨兵節點)設置成待喚醒狀態,第一次進來的時候 有0——>-1 而後繼續執行返回false } return false; }
    parkAndCheckInterrupt()這個方法會讓當前線程阻塞
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // LockSupport.park()會致使當前線程阻塞,直到某個線程調用unpark()方法
        return Thread.interrupted();
    }

    那麼在lock()方法執行時,只要第一個線程沒有unlock()釋放鎖,其餘全部線程都會加入同步阻塞隊列中,該隊列中記錄了阻塞線程的順序,在加入同步阻塞隊列前有屢次機會能夠搶先執行(非公平鎖),若是沒有被執行到,那麼加入同步阻塞隊列後,就只有頭部節點(哨兵節點)後的阻塞線程有機會獲取到鎖進行邏輯處理。再次查看該方法: 

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    // if 代表只有頭部節點(哨兵節點)後的節點在放入同步阻塞隊列前能夠獲取鎖
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 全部線程都被阻塞在這個方法處
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }        

 

2、unlock()方法

一、unlock源碼

    public void unlock() {
        sync.release(1);
    }

  一樣是調用的同步阻塞隊列的方法 sync.release(1),跟進查看源碼:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

二、查看tryRelease()方法:

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                // 若是不是自身鎖對象調用unlock()方法的話,就報異常
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                // 若是標誌位已經爲0,表示重入鎖已經所有釋放,這將當前獲取鎖的線程設置爲null,以便其餘線程進行加鎖
                setExclusiveOwnerThread(null);
            }
            // 更新重入鎖解鎖到達的次數,若是C不爲0,表示還有重入鎖unlock()沒有調用完
            setState(c);
            return free;
        }        

三、若是tryRelease()方法成功執行,表示以前獲取鎖的線程已經執行完全部須要同步的代碼(重入鎖也徹底退出),那麼就須要喚醒同步阻塞隊列中的第一個等待的線程(也是等待最久的線程),執行unparkSuccessor(h)方法:

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        // 先獲取頭結點(哨兵節點)的waitStatus狀態,若是小於0,則能夠獲取鎖,並將waitStatus的狀態設置爲0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            // 若是哨兵節點的下一個節點爲null,或者狀態爲1表示已經取消,則依次循環尋找後面節點,直至找到一個waitStatus<0的節點,並將該節點設置爲須要獲取鎖的節點
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 將該node節點的線程解鎖,容許它去獲取鎖,而後執行業務邏輯
 LockSupport.unpark(s.thread);
    }

 

3、unlock()方法調用後,會到lock()方法阻塞的地方,完成喚醒工做

一、在上面方法 unparkSuccessor(Node node) 中執行完 LockSupport.unpark(s.thread) 後在同步阻塞隊列後的第一個 node 關聯的線程將被喚醒,即unlock()方法代碼執行完,將會到lock() 源碼解析的 2.3 步裏,第三次貼該處源碼:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

二、在上面放大的紅色方法中,以前上面lock()源碼講了當中全部線程都被阻塞了,以下面源碼紅色標記的地方:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

三、全部未獲取到鎖的線程都在 parkAndCheckInterrupt() 方法處阻塞着,因此咱們即將喚醒的哨兵節點後的第一個阻塞線程也是在該處阻塞着,在執行完 unlock() 源碼步驟第3步unparkSuccessor(Node node)中的方法,則將返回到以前阻塞線程的這個方法 parkAndCheckInterrupt()的這行代碼 LockSupport.park(this) 的下一步執行 Thread.interrupted(),由於線程沒有被打斷,因此返回false,故acquireQueued(final Node node, int arg)方法中繼續輪訓再次嘗試acquireQueued(final Node node, int arg)獲取鎖,由於第一個線程已經釋放鎖,因此第二個線程能夠獲取鎖了,並在執行完後返回interrupted爲false,表示線程不是被中斷的。理,其餘線程也在parkAndCheckInterrupt()這個方法中中斷着,等待被第二個線程喚醒。

 

總結:

  在第一個 A 線程 lock() 獲取到鎖後,第一個線程會在底層的同步阻塞隊列中設置鎖狀態 state 爲1(若是重入鎖屢次獲取 state 每次加1),並設置擁有當前鎖的線程爲自身A線程,其餘線程 B/C/D 來獲取鎖的時候就會比較鎖狀態是否爲0,若是不爲0,表示已經被獲取了鎖,再次比較獲取鎖的線程是否爲自身,若是爲自身則對 state 加1(知足重入鎖的規則),不然這將當前未獲取到鎖的線程放入同步阻塞隊列中,在放入的過程當中,須要設置 head 哨兵節點和 tail 尾節點,以及相應的 waitStatus 狀態,而且在放入過程當中須要設置當前節點以及先關節點的 prev 和 next 節點,從而達到雙向隊列的效果,存放到阻塞隊列後,線程會被阻塞到這樣一個方法中 parkAndCheckInterrupt(),等待被喚醒。

  在第一個 A 線程執行完畢,調用 unlock() 解鎖後,unlock() 方法會從同步阻塞隊列的哨兵節點後的第一個節點獲取等待解鎖的線程B,並將其解鎖,而後就會到B阻塞的方法 parkAndCheckInterrupt() 來繼續執行,調用 selfInterrupt()方法,最終調用底層的 c語言的喚醒方法,使得 B 線程完成 lock() 方法獲取到鎖,而後執行業務邏輯。其餘線程以此類推,依次發生阻塞和喚醒。

 

根據源碼總結的 lock() 和 unlock() 的原理,歡迎大神批評指正,若有恰好的看法,也歡迎你們相互交流。

相關文章
相關標籤/搜索