ReentrantLock之公平鎖源碼分析

  本文分析的ReentrantLock所對應的Java版本爲JDK8。java

  在閱讀本文前,讀者應該知道什麼是CAS、自旋。node

本文大綱

  1.ReentrantLock公平鎖簡介
  2.AQS
  3.lock方法
  4.unlock方法app

1. ReentrantLock公平鎖簡介

  ReentrantLock是JUC(java.util.concurrent)包中Lock接口的一個實現類,它是基於AbstractQueuedSynchronizer(下文簡稱AQS)來實現鎖的功能。ReentrantLock的內部類Sync繼承了AbstractQueuedSynchronizer,Sync又有FairSync和NonFairSync兩個子類。FairSync實現了公平鎖相關的操做,NonFairSync實現了非公平鎖相關的操做。它們之間的關係以下:ui

 

  公平鎖的公平之處主要體如今,對於一個新來的線程,若是鎖沒有被佔用,它會判斷等待隊列中是否還有其它的等待線程,若是有的話,就加入等待隊列隊尾,不然就去搶佔鎖。this

  下面這段代碼展現了公平鎖的使用方法:spa

private final Lock lock = new ReentrantLock(true); // 參數true表明建立公平鎖

public void method() { lock.lock(); // block until condition holds
     try { // ... method body
     } finally { lock.unlock(); } }

2. AQS

  下面簡單介紹一下AQS中的Node內部類和幾個重要的成員變量。線程

2.1 Node

  AQS中,維護了一個Node內部類,用於包裝咱們的線程。咱們須要關注Node中的以下屬性:code

  •   pre:當前節點的前驅節點。
  •   next:當前節點的後繼節點。
  •   thread:thread表示被包裝的線程。
  •   waitStatus:waitStatus是一個int整型,能夠被賦予以下幾種值:
static final int CANCELLED =  1; // 線程被取消
static final int SIGNAL  = -1; // 後繼節點中的線程須要被喚醒
static final int CONDITION = -2; // 暫不關注
static final int PROPAGATE = -3; // 暫不關注

  另外,當一個新的Node被建立時,waitStatus是0。orm

2.2 head

  head指向隊列中的隊首元素,能夠理解爲當前持有鎖的線程。blog

2.3 tail

  tail指向隊列中的隊尾元素。

2.4 state

  state表示在ReentrantLock中能夠理解爲鎖的狀態,0表示當前鎖沒有被佔用,大於0的數表示鎖被當前線程重入的次數。例如,當state等於2時,表示當前線程在這把鎖上進入了兩次。

2.5 exclusiveOwnerThread

  表示當前佔用鎖的線程。

2.6 等待隊列

  下圖簡單展現了AQS中的等待隊列:

3. lock方法

  有了上面的AQS的基礎知識後,咱們就能夠展開對ReentrantLock公平鎖的分析了,先從lock方法入手。

  ReentrantLock中的lock方法很簡單,只是調用了Sync類(本文研究公平鎖,因此應該是FairSync類)中的lock方法:

public void lock() { sync.lock(); }

  咱們跟到FairSync的lock方法,代碼也很簡單,調用了AQS中的acquire方法:

final void lock() { acquire(1); }

  acquire方法:

public final void acquire(int arg) { if (!tryAcquire(arg) && // 調用tryAcquire嘗試去獲取鎖,若是獲取成功,則方法結束
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 若是獲取鎖失敗,執行acquireQueued方法,將把當前線程排入隊尾
 selfInterrupt(); }

  tryAcquire方法:

protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 獲取鎖的狀態
    if (c == 0) { // 若是狀態是0,表示鎖沒有被佔用
        if (!hasQueuedPredecessors() && // 判斷是隊列中是否有排隊中的線程
            compareAndSetState(0, acquires)) { // 隊列中沒有排隊的線程,則嘗試用CAS去獲取一下鎖
            setExclusiveOwnerThread(current); // 獲取鎖成功,則將當前佔有鎖的線程設置爲當前線程
            return true; } } // 鎖被佔用、隊列中有排隊的線程或者當前線程在獲取鎖的時候失敗將執行下面的代碼
    else if (current == getExclusiveOwnerThread()) { // 當前線程是不是佔有鎖的線程
        int nextc = c + acquires; // 是的話,表示當前線程是重入這把鎖,將鎖的狀態進行加1
        if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 鎖的重入次數超過int可以表示最大的值,拋出異常
        setState(nextc); // 設置鎖的狀態
        return true; } return false; // 沒有獲取到鎖
}

  hasQueuedPredecessors方法:

public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && // 隊列中的隊首和隊尾元素不相同
        ((s = h.next) == null || s.thread != Thread.currentThread()); // 隊列中的第二個元素不爲null,且第二個元素中的線程不是當前線程。這裏若是返回true,說明隊列中至少存在tail、head兩個節點,就會執行acquireQueued將當前線程加入隊尾
}

  若是tryAcquire沒有獲取到鎖,將執行:

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

  咱們先分析addWaiter方法:

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 將當前線程包裝成Node,mode參數值爲null,表示獨佔模式 // Try the fast path of enq; backup to full enq on failure
    Node pred = tail; if (pred != null) { node.prev = pred; // 若是隊列中的尾節點不爲空,將當前node的前驅節點設置爲以前隊列中的tail
        if (compareAndSetTail(pred, node)) { // 用CAS把當前node設置爲隊尾元素
            pred.next = node; // 成功的話,則將以前隊尾元素的後繼節點設置爲當前節點。若是這裏不清楚的話,請結合前面講等待隊列的那張圖進行理解。
            return node; } } enq(node); // 隊尾節點爲空,或者用CAS設置隊尾元素失敗,則用自旋的方式入隊
    return node; }

  enq方法:

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) // 隊尾元素爲空,建立一個空的Node,並設置爲隊首
                tail = head; // 設置隊首和隊尾爲同一個空Node,進入下一次循環
        } else { node.prev = t; // 若是隊列中的尾節點不爲空,將當前node的前驅節點設置爲以前隊列中的tail
            if (compareAndSetTail(t, node)) { // 用CAS把當前node設置爲隊尾元素
                t.next = node; // 成功的話,則將以前隊尾元素的後繼節點設置爲當前節點
                return t;
 } } } }

   下面這張圖反應了上面enq方法的處理流程:

  通過上面的方法,當前node已經加入等待隊列的隊尾,接下來將執行acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 獲取node的前驅節點
            if (p == head && tryAcquire(arg)) { // 若是node的前驅是head,它將去嘗試獲取鎖(tryAcquire方法在前面已經分析過)
                setHead(node); // 獲取成功,則將node設置爲head
                p.next = null; // 將以前的head的後繼節點置空
                failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && // 當前node的前驅不是head,將爲當前node找到一個可以將其喚醒的前驅節點;或者當前node的前驅是head,可是獲取鎖失敗
                parkAndCheckInterrupt()) // 將當前線程掛起
                interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

  shouldParkAfterFailedAcquire方法的做用就是找到一個可以喚醒當前node的節點:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 開始時是0
    if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */
        return true; // 前驅節點的狀態是-1,會喚醒後繼節點,能夠將線程掛起
    if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do { node.prev = pred = pred.prev; // 前驅節點中的線程被取消,那就須要一直循環直到找到一個沒有被設置爲取消狀態的前驅節點
        } while (pred.waitStatus > 0); pred.next = node; // 從後向前找,將第一個非取消狀態的節點,設置這個節點的後繼節點設置爲當前node
    } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // waitStatus是0或者-3的時候,這時waitStatus都將被設置爲-1 // 即後繼節點須要前驅節點喚醒
 } return false; // 上層代碼再進行一次循環,下次進入此方法時,將進入第一個if條件
}

  找到了合適的前驅節點,parkAndCheckInterrupt方法當前線程掛起:

private final boolean parkAndCheckInterrupt() { // 將線程掛起,等待前驅節點的喚醒
    LockSupport.park(this); return Thread.interrupted(); }

 4. unlock方法

  ReentrantLock的unlock方法調用AQS中的release方法:

public void unlock() { sync.release(1); // 調用AQS的release方法
}

  release方法:

public final boolean release(int arg) { if (tryRelease(arg)) { // 嘗試去釋放鎖
        Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 釋放鎖成功,head不爲空,而且head的waitStatus不爲0的狀況下,將喚醒後繼節點
        return true; } return false; }

  tryRelease方法:

protected final boolean tryRelease(int releases) { int c = getState() - releases; // 將鎖的狀態減1
    if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 準備釋放鎖的線程不是持有鎖的線程,拋出異常
    boolean free = false; if (c == 0) { free = true; // 鎖的狀態是0,說明不存在重入的狀況了,能夠直接釋放了
        setExclusiveOwnerThread(null); } setState(c); return free; }

  鎖釋放成功,將喚醒後繼節點,unparkSuccessor方法:

private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
    int ws = node.waitStatus; // 注意,這個node是head節點
    if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 當前node的狀態是小於0,將其狀態設置爲0

    /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; // head節點的後繼節點
    if (s == null || s.waitStatus > 0) { s = null; // 執行到這表示head的後繼節點是1,處於取消的狀態
        for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; // 從等待隊列的隊尾向前找,找到倒序的最後一個處於非取消狀態的節點
 } if (s != null) LockSupport.unpark(s.thread); // 喚醒head後面的處於非取消狀態的第一個(正序)節點
}

  到此,全文結束,你們看代碼的時候結合圖來理解會容易不少。

相關文章
相關標籤/搜索