讀ReentrantLock源碼筆記

1.lock實現

public void lock() {    sync.lock();}

咱們都知道reentrantlock分爲非公平鎖和公平鎖,經過new ReentrantLock(true);能夠變成公平鎖java

咱們這裏分析一下jdk1.8時候的lock的實現node

公平鎖機制下lock調用的源碼以下:ui

final void lock() {    acquire(1);}

流程圖: www.processon.com/view/link/5e3102efe4b05b335ff6b35dthis

public final void acquire(int arg) {
    // 嘗試得到鎖
    if (!tryAcquire(arg) &&
        // 得到鎖失敗則將當前線程變成Node節點add進等待隊列
        // Node.EXCLUSIVE互斥模式、Node.SHARED共享模式
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這裏先看一下tryAcquire的實現spa

protected final boolean tryAcquire(int acquires) {
    // 得到當前線程
    final Thread current = Thread.currentThread();
    // 取出鎖狀態
    int c = getState();
    // 爲0表明還無線程佔用
    if (c == 0) {
        // hasQueuedPredecessors判斷等待隊列是否初始化,簡單來講就是判斷當前線程是不是隊列中的第一個線程
        if (!hasQueuedPredecessors() &&
            // CAS,將狀態+1
            compareAndSetState(0, acquires)) {
            // 設置當前線程爲擁有者線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 判斷當前線程是否爲重入
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

hasQueuedPredecessors源碼:線程

case 1:當前等待隊列未初始化,若是爲未初始化則head和tail都爲null,那麼h != t確定不成立翻譯

case 2:當前等待隊列中等於1,則也直接h != t不成立3d

case 3:當前等待隊列中大於1,則&&後的判斷,先判斷是否還有下一個節點,而後判斷當前線程是否和隊列中第一個排隊的節點的thread相等,不相等則表明有比當前線程更早的獲取鎖的線程在等待,由於公平鎖須要先來後到的執行。返回true,不會進入if。code

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

咱們在看acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 中addWaiter的實現對象

addWaiter將本身入隊,看源碼實現:

private Node addWaiter(Node mode) {
    // 將當前線程以指定的模式建立節點node
    Node node = new Node(Thread.currentThread(), mode);
    // 獲取當前等待隊列的尾節點
    Node pred = tail;
    // 隊列不爲空,將新的node加入等待隊列中
    if (pred != null) {
        node.prev = pred;
         // CAS方式將當前節點尾插入隊列中
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 當隊列爲empty或者CAS失敗時會調用enq方法處理
    enq(node);
    return node;
}

其中,隊列爲empty,使用enq(node)處理,將當前節點插入等待隊列,若是隊列爲空,則初始化當前隊列。全部操做都是CAS自旋的方式進行,直到成功加入隊尾爲止。
private Node enq(final Node node) {
    // 不斷自旋
    for (;;) {
        Node t = tail;
        // 當前隊列爲empty
        if (t == null) {
            // 完成隊列初始化操做,頭結點中不放數據,只是做爲起始標記,lazy-load,在第一次用的時候new
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // 不斷將當前節點使用CAS尾插入隊列中直到成功爲止
            // compareAndSetTail(t, node) 判斷尾部節點是否是t,是的話就將尾部指向node
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued用於已在隊列中的線程以獨佔且不間斷模式獲取state狀態,直到獲取鎖後返回。主要流程:

  • 結點node進入隊列尾部後,檢查狀態;
  • 調用park()進入waiting狀態,等待unpark()或interrupt()喚醒;
  • 被喚醒後,是否獲取到鎖。若是獲取到,head指向當前結點,並返回從入隊到獲取鎖的整個過程當中是否被中斷過;若是沒獲取到,繼續流程1

Lock類對於鎖的實現不會令線程進入阻塞狀態,Lock底層調用LockSupport.park()方法,使線程進入的是等待狀態。

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))的實現:

final boolean acquireQueued(final Node node, int arg) {
  // 是否已獲取鎖的標誌,默認爲true 即爲還沒有
  boolean failed = true;
  try {
      // 等待中是否被中斷過的標記
      boolean interrupted = false;
      for (;;) {
          // 獲取當前節點的前節點
          final Node p = node.predecessor();
          // 若是前節點是頭結點,則意味着本身是第一個排隊的節點,嘗試獲取鎖
          if (p == head && tryAcquire(arg)) {
              // 得到鎖成功後將當前節點設置爲頭結點
              setHead(node);
              p.next = null; // help GC
              failed = false;
              return interrupted;
          }
          // shouldParkAfterFailedAcquire翻譯過來「在獲取鎖失敗以後應該等待」
          // shouldParkAfterFailedAcquire根據對當前節點的前一個節點的狀態進行判斷,對當前節點作出不一樣的操做
          // parkAndCheckInterrupt讓線程進入等待狀態,並檢查當前線程是否被能夠被中斷
          if (shouldParkAfterFailedAcquire(p, node) &&
              parkAndCheckInterrupt())
              interrupted = true;
      }
  } finally {
      // 將當前節點設置爲取消狀態(Node.CANCELLED),爲1
      if (failed)
          cancelAcquire(node);
  }
}


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // waitStatus默認爲0
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 使用CAS將前一個節點狀態由 INITIAL 設置成 SIGNAL(這裏修改以後會自旋再次嘗試得到鎖)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

shouldParkAfterFailedAcquire 方法

  • 主要邏輯是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)使用CAS將前一個節點狀態由 INITIAL 設置成 SIGNAL,表示當前線程。

    爲何要設置前一個線程的狀態爲SIGNAL而不是本身呢?

    打個比方,你本身在房間睡着的時候知道本身睡着了嗎?固然是不知道,那麼你睡着了能本身關門嘛?不能,那你關了門萬一沒睡呢?因此就須要別人來幫你把門關上。

    你也能夠理解爲修改狀態的操做和使線程睡眠的操做不具備原子性,可能出現修改完狀態以後卻沒睡着的狀況。

  • shouldParkAfterFailedAcquire方法會在死循環中反覆重試,直至compareAndSetWaitStatus 設置節點狀態位爲 SIGNAL 時 shouldParkAfterFailedAcquire 返回 true 時纔會執行方法 parkAndCheckInterrupt 方法。

parkAndCheckInterrupt 該方法的關鍵是會調用 LookSupport.park 方法,該方法是用來當前線程。

waitStatus的各個值都是什麼意思:

靜態變量 描述
Node.CANCELLED 1 節點對應的線程已經被取消了(咱們後邊詳細會說線程如何被取消)
Node.SIGNAL -1 表示後邊的節點對應的線程處於等待狀態
Node.CONDITION -2 表示節點在等待隊列中(稍後會詳細說什麼是等待隊列)
Node.PROPAGATE -3 表示下一次共享式同步狀態獲取將被無條件的傳播下去(稍後再說共享式同步狀態的獲取與釋放時詳細嘮叨)
0 初始狀態

2.interrupt(),interrupted() 和 isInterrupted() 的區別

interrupt():將調用該方法的對象所表示的線程標記一箇中止標記,並非真的中止該線程。

interrupted():獲取當前線程的中斷狀態,而且會清除線程的狀態標記。是一個是靜態方法。

isInterrupted():獲取調用該方法的對象所表示的線程的中斷狀態,不會清除線程的狀態標記。是一個實例方法。

簡單的中斷案例:t1先執行,可是sleep不釋放鎖資源,在這期間t2等候兩秒鐘還沒拿到鎖就中斷

public class Test {
    static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            testsync();
        },"t1");
        Thread t2 = new Thread(() -> {
            testsync();
        },"t2");
        t1.start();
        TimeUnit.SECONDS.sleep(2);
        t2.start();
        TimeUnit.SECONDS.sleep(2);
        System.out.println("main");
        /**
         * 若是t2兩秒鐘還拿不到就中斷
         */
        t2.interrupt();
    }
    public static void testsync(){
        try {
            /**
             * lockInterruptibly 和 lock的區別,前者會直接拋出異常能夠響應中斷,後者則不能夠
             */
            lock.lockInterruptibly();
            System.out.println(Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

lock和lockInterruptibly的區別就在以下:

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

重點在doAcquireInterruptibly方法
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 區別在這
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

acquireQueued() 基本相同,惟一的區別是對中斷信號的處理。

acquireQueued() 被中斷後,將中斷標誌傳給外界,外界再調用Thread的interrupt() 復現中斷;而doAcquireInterruptibly() 則直接拋出InterruptedException。

兩者本質上沒什麼不一樣。但doAcquireInterruptibly()顯示拋出了InterruptedException,調用者必須處理或繼續上拋該異常。


3.unlock實現

這裏咱們看release的實現,sync是reentrantlock的一個內部抽象類,繼承了AbstractQueuedSynchronizer

reentrantlock的公平鎖FairSync和非公平鎖NonfairSync都實現了sync

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    // 嘗試釋放鎖
    if (tryRelease(arg)) {
        Node h = head;
        // 當waitStatus不爲0時,它會喚醒等待隊列裏的其餘線程來獲取資源。
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    // 將狀態值減1(由於是可重入鎖,因此釋放鎖的次數和加鎖的次數要一一對應)
    int c = getState() - releases;
    // 判斷當前線程是不是持有鎖的線程
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否成功釋放鎖標誌,默認爲還沒有
    boolean free = false;
    // 當狀態值爲0就會進行解鎖,清空鎖持有線程。
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 設置可重入次數爲原始值0
    setState(c);
    return free;
}

咱們繼續看unparkSuccessor():

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 若是waitStatus爲-1,即表示後邊的節點對應的線程處於等待狀態
    if (ws < 0)
        // 將waitStatus改成0
        compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
    }
    if (s != null)
        // 喚醒第一個排隊的線程
        LockSupport.unpark(s.thread);
}

這裏當unpark喚醒下一個線程的時候,咱們以前阻塞的線程就會在acquireQueued這個地方被喚醒,且繼續執行

// parkAndCheckInterrupt 這裏被喚醒後會獲取當前線程的中斷狀態,而且會清除線程的狀態標記。
// 若是沒有被中斷就if不成立,從新進行for循環
if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())      


for (;;) {
          // 獲取當前節點的前節點
          final Node p = node.predecessor();
          // 若是前節點是頭結點,則意味着本身是第一個排隊的節點,嘗試獲取鎖
          if (p == head && tryAcquire(arg)) {
              // 成功則將當前節點設置爲頭結點
              setHead(node);
              p.next = null; // help GC
              failed = false;
              // 最後返回false
              return interrupted;
          }
          if (shouldParkAfterFailedAcquire(p, node) &&
              parkAndCheckInterrupt())
              interrupted = true;
      }
相關文章
相關標籤/搜索