死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

問題

(1)重入鎖是什麼?java

(2)ReentrantLock如何實現重入鎖?node

(3)ReentrantLock爲何默認是非公平模式?c#

(4)ReentrantLock除了可重入還有哪些特性?源碼分析

簡介

Reentrant = Re + entrant,Re是重複、又、再的意思,entrant是enter的名詞或者形容詞形式,翻譯爲進入者或者可進入的,因此Reentrant翻譯爲可重複進入的、可再次進入的,所以ReentrantLock翻譯爲重入鎖或者再入鎖。性能

重入鎖,是指一個線程獲取鎖以後再嘗試獲取鎖時會自動獲取鎖。學習

在Java中,除了ReentrantLock之外,synchronized也是重入鎖。ui

那麼,ReentrantLock的可重入性是怎麼實現的呢?this

繼承體系

qrcode

ReentrantLock實現了Lock接口,Lock接口裏面定義了java中鎖應該實現的幾個方法:線程

// 獲取鎖
void lock();
// 獲取鎖(可中斷)
void lockInterruptibly() throws InterruptedException;
// 嘗試獲取鎖,若是沒獲取到鎖,就返回false
boolean tryLock();
// 嘗試獲取鎖,若是沒獲取到鎖,就等待一段時間,這段時間內還沒獲取到鎖就返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 釋放鎖
void unlock();
// 條件鎖
Condition newCondition();

Lock接口中主要定義了 獲取鎖、嘗試獲取鎖、釋放鎖、條件鎖等幾個方法。翻譯

源碼分析

主要內部類

ReentrantLock中主要定義了三個內部類:Sync、NonfairSync、FairSync。

abstract static class Sync extends AbstractQueuedSynchronizer {}

static final class NonfairSync extends Sync {}

static final class FairSync extends Sync {}

(1)抽象類Sync實現了AQS的部分方法;

(2)NonfairSync實現了Sync,主要用於非公平鎖的獲取;

(3)FairSync實現了Sync,主要用於公平鎖的獲取。

在這裏咱們先不急着看每一個類具體的代碼,等下面學習具體的功能點的時候再把全部方法串起來。

主要屬性

private final Sync sync;

主要屬性就一個sync,它在構造方法中初始化,決定使用公平鎖仍是非公平鎖的方式獲取鎖。

主要構造方法

// 默認構造方法
public ReentrantLock() {
    sync = new NonfairSync();
}
// 本身可選擇使用公平鎖仍是非公平鎖
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

(1)默認構造方法使用的是非公平鎖;

(2)第二個構造方法能夠本身決定使用公平鎖仍是非公平鎖;

上面咱們分析了ReentrantLock的主要結構,下面咱們跟着幾個主要方法來看源碼。

lock()方法

彤哥貼心地在每一個方法的註釋都加上方法的來源。

公平鎖

這裏咱們假設ReentrantLock的實例是經過如下方式得到的:

ReentrantLock reentrantLock = new ReentrantLock(true);

下面的是加鎖的主要邏輯:

// ReentrantLock.lock()
public void lock() {
    // 調用的sync屬性的lock()方法
    // 這裏的sync是公平鎖,因此是FairSync的實例
    sync.lock();
}
// ReentrantLock.FairSync.lock()
final void lock() {
    // 調用AQS的acquire()方法獲取鎖
    // 注意,這裏傳的值爲1
    acquire(1);
}
// AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
    // 嘗試獲取鎖
    // 若是失敗了,就排隊
    if (!tryAcquire(arg) &&
        // 注意addWaiter()這裏傳入的節點模式爲獨佔模式
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
// ReentrantLock.FairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
    // 當前線程
    final Thread current = Thread.currentThread();
    // 查看當前狀態變量的值
    int c = getState();
    // 若是狀態變量的值爲0,說明暫時尚未人佔有鎖
    if (c == 0) {
        // 若是沒有其它線程在排隊,那麼當前線程嘗試更新state的值爲1
        // 若是成功了,則說明當前線程獲取了鎖
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 當前線程獲取了鎖,把本身設置到exclusiveOwnerThread變量中
            // exclusiveOwnerThread是AQS的父類AbstractOwnableSynchronizer中提供的變量
            setExclusiveOwnerThread(current);
            // 返回true說明成功獲取了鎖
            return true;
        }
    }
    // 若是當前線程自己就佔有着鎖,如今又嘗試獲取鎖
    // 那麼,直接讓它獲取鎖並返回true
    else if (current == getExclusiveOwnerThread()) {
        // 狀態變量state的值加1
        int nextc = c + acquires;
        // 若是溢出了,則報錯
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 設置到state中
        // 這裏不須要CAS更新state
        // 由於當前線程佔有着鎖,其它線程只會CAS把state從0更新成1,是不會成功的
        // 因此不存在競爭,天然不須要使用CAS來更新
        setState(nextc);
        // 當線程獲取鎖成功
        return true;
    }
    // 當前線程嘗試獲取鎖失敗
    return false;
}
// AbstractQueuedSynchronizer.addWaiter()
// 調用這個方法,說明上面嘗試獲取鎖失敗了
private Node addWaiter(Node mode) {
    // 新建一個節點
    Node node = new Node(Thread.currentThread(), mode);
    // 這裏先嚐試把新節點加到尾節點後面
    // 若是成功了就返回新節點
    // 若是沒成功再調用enq()方法不斷嘗試
    Node pred = tail;
    // 若是尾節點不爲空
    if (pred != null) {
        // 設置新節點的前置節點爲如今的尾節點
        node.prev = pred;
        // CAS更新尾節點爲新節點
        if (compareAndSetTail(pred, node)) {
            // 若是成功了,把舊尾節點的下一個節點指向新節點
            pred.next = node;
            // 並返回新節點
            return node;
        }
    }
    // 若是上面嘗試入隊新節點沒成功,調用enq()處理
    enq(node);
    return node;
}
// AbstractQueuedSynchronizer.enq()
private Node enq(final Node node) {
    // 自旋,不斷嘗試
    for (;;) {
        Node t = tail;
        // 若是尾節點爲空,說明還未初始化
        if (t == null) { // Must initialize
            // 初始化頭節點和尾節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 若是尾節點不爲空
            // 設置新節點的前一個節點爲如今的尾節點
            node.prev = t;
            // CAS更新尾節點爲新節點
            if (compareAndSetTail(t, node)) {
                // 成功了,則設置舊尾節點的下一個節點爲新節點
                t.next = node;
                // 並返回舊尾節點
                return t;
            }
        }
    }
}
// AbstractQueuedSynchronizer.acquireQueued()
// 調用上面的addWaiter()方法使得新節點已經成功入隊了
// 這個方法是嘗試讓當前節點來獲取鎖的
final boolean acquireQueued(final Node node, int arg) {
    // 失敗標記
    boolean failed = true;
    try {
        // 中斷標記
        boolean interrupted = false;
        // 自旋
        for (;;) {
            // 當前節點的前一個節點
            final Node p = node.predecessor();
            // 若是當前節點的前一個節點爲head節點,則說明輪到本身獲取鎖了
            // 調用ReentrantLock.FairSync.tryAcquire()方法再次嘗試獲取鎖
            if (p == head && tryAcquire(arg)) {
                // 嘗試獲取鎖成功
                // 這裏同時只會有一個線程在執行,因此不須要用CAS更新
                // 把當前節點設置爲新的頭節點
                setHead(node);
                // 並把上一個節點從鏈表中刪除
                p.next = null; // help GC
                // 未失敗
                failed = false;
                return interrupted;
            }
            // 是否須要阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 真正阻塞的方法
                parkAndCheckInterrupt())
                // 若是中斷了
                interrupted = true;
        }
    } finally {
        // 若是失敗了
        if (failed)
            // 取消獲取鎖
            cancelAcquire(node);
    }
}
// AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()
// 這個方法是在上面的for()循環裏面調用的
// 第一次調用會把前一個節點的等待狀態設置爲SIGNAL,並返回false
// 第二次調用纔會返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 上一個節點的等待狀態
    // 注意Node的waitStatus字段咱們在上面建立Node的時候並無指定
    // 也就是說使用的是默認值0
    // 這裏把各類等待狀態再貼出來
    //static final int CANCELLED =  1;
    //static final int SIGNAL    = -1;
    //static final int CONDITION = -2;
    //static final int PROPAGATE = -3;
    int ws = pred.waitStatus;
    // 若是等待狀態爲SIGNAL(等待喚醒),直接返回true
    if (ws == Node.SIGNAL)
        return true;
    // 若是前一個節點的狀態大於0,也就是已取消狀態
    if (ws > 0) {
        // 把前面全部取消狀態的節點都從鏈表中刪除
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 若是前一個節點的狀態小於等於0,則把其狀態設置爲等待喚醒
        // 這裏能夠簡單地理解爲把初始狀態0設置爲SIGNAL
        // CONDITION是條件鎖的時候使用的
        // PROPAGATE是共享鎖使用的
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
// AbstractQueuedSynchronizer.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
    // 阻塞當前線程
    // 底層調用的是Unsafe的park()方法
    LockSupport.park(this);
    // 返回是否已中斷
    return Thread.interrupted();
}

看過以前彤哥寫的【死磕 java同步系列之本身動手寫一個鎖Lock】的同窗看今天這個加鎖過程應該思路會比較清晰。

下面咱們看一下主要方法的調用關係,能夠跟着個人 → 層級在腦海中大概過一遍每一個方法的主要代碼:

ReentrantLock#lock()
->ReentrantLock.FairSync#lock() // 公平模式獲取鎖
  ->AbstractQueuedSynchronizer#acquire() // AQS的獲取鎖方法
    ->ReentrantLock.FairSync#tryAcquire() // 嘗試獲取鎖
    ->AbstractQueuedSynchronizer#addWaiter()  // 添加到隊列
	  ->AbstractQueuedSynchronizer#enq()  // 入隊
    ->AbstractQueuedSynchronizer#acquireQueued() // 裏面有個for()循環,喚醒後再次嘗試獲取鎖
      ->AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire() // 檢查是否要阻塞
      ->AbstractQueuedSynchronizer#parkAndCheckInterrupt()  // 真正阻塞的地方

獲取鎖的主要過程大體以下:

(1)嘗試獲取鎖,若是獲取到了就直接返回了;

(2)嘗試獲取鎖失敗,再調用addWaiter()構建新節點並把新節點入隊;

(3)而後調用acquireQueued()再次嘗試獲取鎖,若是成功了,直接返回;

(4)若是再次失敗,再調用shouldParkAfterFailedAcquire()將節點的等待狀態置爲等待喚醒(SIGNAL);

(5)調用parkAndCheckInterrupt()阻塞當前線程;

(6)若是被喚醒了,會繼續在acquireQueued()的for()循環再次嘗試獲取鎖,若是成功了就返回;

(7)若是不成功,再次阻塞,重複(3)(4)(5)直到成功獲取到鎖。

以上就是整個公平鎖獲取鎖的過程,下面咱們看看非公平鎖是怎麼獲取鎖的。

非公平鎖

// ReentrantLock.lock()
public void lock() {
    sync.lock();
}
// ReentrantLock.NonfairSync.lock()
// 這個方法在公平鎖模式下是直接調用的acquire(1);
final void lock() {
    // 直接嘗試CAS更新狀態變量
    if (compareAndSetState(0, 1))
        // 若是更新成功,說明獲取到鎖,把當前線程設爲獨佔線程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
// ReentrantLock.NonfairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
    // 調用父類的方法
    return nonfairTryAcquire(acquires);
}
// ReentrantLock.Sync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 若是狀態變量的值爲0,再次嘗試CAS更新狀態變量的值
        // 相對於公平鎖模式少了!hasQueuedPredecessors()條件
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

相對於公平鎖,非公平鎖加鎖的過程主要有兩點不一樣:

(1)一開始就嘗試CAS更新狀態變量state的值,若是成功了就獲取到鎖了;

(2)在tryAcquire()的時候沒有檢查是否前面有排隊的線程,直接上去獲取鎖才無論別人有沒有排隊呢;

總的來講,相對於公平鎖,非公平鎖在一開始就多了兩次直接嘗試獲取鎖的過程。

lockInterruptibly()方法

支持線程中斷,它與lock()方法的主要區別在於lockInterruptibly()獲取鎖的時候若是線程中斷了,會拋出一個異常,而lock()不會管線程是否中斷都會一直嘗試獲取鎖,獲取鎖以後把本身標記爲已中斷,繼續執行本身的邏輯,後面也會正常釋放鎖。

題外話:

線程中斷,只是在線程上打一箇中斷標誌,並不會對運行中的線程有什麼影響,具體須要根據這個中斷標誌幹些什麼,用戶本身去決定。

好比,若是用戶在調用lock()獲取鎖後,發現線程中斷了,就直接返回了,而致使沒有釋放鎖,這也是容許的,可是會致使這個鎖一直得不到釋放,就出現了死鎖。

lock.lock();

if (Thread.currentThread().interrupted()) {
    return ;
}

lock.unlock();

固然,這裏只是舉個例子,實際使用確定是要把lock.lock()後面的代碼都放在try...finally...裏面的以保證鎖始終會釋放,這裏主要是爲了說明線程中斷只是一個標誌,至於要作什麼徹底由用戶本身決定。

tryLock()方法

嘗試獲取一次鎖,成功了就返回true,沒成功就返回false,不會繼續嘗試。

// ReentrantLock.tryLock()
public boolean tryLock() {
    // 直接調用Sync的nonfairTryAcquire()方法
    return sync.nonfairTryAcquire(1);
}
// ReentrantLock.Sync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

tryLock()方法比較簡單,直接以非公平的模式去嘗試獲取一次鎖,獲取到了或者鎖原本就是當前線程佔有着就返回true,不然返回false。

tryLock(long time, TimeUnit unit)方法

嘗試獲取鎖,並等待一段時間,若是在這段時間內都沒有獲取到鎖,就返回false。

// ReentrantLock.tryLock()
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    // 調用AQS中的方法
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// AbstractQueuedSynchronizer.tryAcquireNanos()
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 若是線程中斷了,拋出異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 先嚐試獲取一次鎖
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
// AbstractQueuedSynchronizer.doAcquireNanos()
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 若是時間已經到期了,直接返回false
    if (nanosTimeout <= 0L)
        return false;
    // 到期時間
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            // 若是到期了,就直接返回false
            if (nanosTimeout <= 0L)
                return false;
            // spinForTimeoutThreshold = 1000L;
            // 只有到期時間大於1000納秒,才阻塞
            // 小於等於1000納秒,直接自旋解決就得了
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                // 阻塞一段時間
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

tryLock(long time, TimeUnit unit)方法在阻塞的時候加上阻塞時間,而且會隨時檢查是否到期,只要到期了沒獲取到鎖就返回false。

unlock()方法

釋放鎖。

// java.util.concurrent.locks.ReentrantLock.unlock()
public void unlock() {
    sync.release(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.release
public final boolean release(int arg) {
    // 調用AQS實現類的tryRelease()方法釋放鎖
    if (tryRelease(arg)) {
        Node h = head;
        // 若是頭節點不爲空,且等待狀態不是0,就喚醒下一個節點
        // 還記得waitStatus嗎?
        // 在每一個節點阻塞以前會把其上一個節點的等待狀態設爲SIGNAL(-1)
        // 因此,SIGNAL的準確理解應該是喚醒下一個等待的線程
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// java.util.concurrent.locks.ReentrantLock.Sync.tryRelease
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 若是當前線程不是佔有着鎖的線程,拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 若是狀態變量的值爲0了,說明徹底釋放了鎖
    // 這也就是爲何重入鎖調用了多少次lock()就要調用多少次unlock()的緣由
    // 若是不這樣作,會致使鎖不會徹底釋放,別的線程永遠沒法獲取到鎖
    if (c == 0) {
        free = true;
        // 清空佔有線程
        setExclusiveOwnerThread(null);
    }
    // 設置狀態變量的值
    setState(c);
    return free;
}
private void unparkSuccessor(Node node) {
    // 注意,這裏的node是頭節點
    
    // 若是頭節點的等待狀態小於0,就把它設置爲0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 頭節點的下一個節點
    Node s = node.next;
    // 若是下一個節點爲空,或者其等待狀態大於0(實際爲已取消)
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從尾節點向前遍歷取到隊列最前面的那個狀態不是已取消狀態的節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 若是下一個節點不爲空,則喚醒它
    if (s != null)
        LockSupport.unpark(s.thread);
}

釋放鎖的過程大體爲:

(1)將state的值減1;

(2)若是state減到了0,說明已經徹底釋放鎖了,喚醒下一個等待着的節點;

未完待續,下一章咱們繼續學習ReentrantLock中關於條件鎖的部分

彩蛋

爲何ReentrantLock默認採用的是非公平模式?

答:由於非公平模式效率比較高。

爲何非公平模式效率比較高?

答:由於非公平模式會在一開始就嘗試兩次獲取鎖,若是當時正好state的值爲0,它就會成功獲取到鎖,少了排隊致使的阻塞/喚醒過程,而且減小了線程頻繁的切換帶來的性能損耗。

非公平模式有什麼弊端?

答:非公平模式有可能會致使一開始排隊的線程一直獲取不到鎖,致使線程餓死。

推薦閱讀

  1. 死磕 java同步系列之AQS起篇

  2. 死磕 java同步系列之本身動手寫一個鎖Lock

  3. 死磕 java魔法類之Unsafe解析

  4. 死磕 java同步系列之JMM(Java Memory Model)

  5. 死磕 java同步系列之volatile解析

  6. 死磕 java同步系列之synchronized解析


歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。

qrcode

相關文章
相關標籤/搜索