Java讀源碼之ReentrantLock

前言

ReentrantLock 可重入鎖,應該是除了 synchronized 關鍵字外用的最多的線程同步手段了,雖然JVM維護者瘋狂優化 synchronized 使其已經擁有了很好的性能。但 ReentrantLock 仍有其存在價值,例如能夠感知線程中斷,公平鎖模式,能夠指定超時時間的搶鎖等更細粒度的控制都是目前的 synchronized 作不到的。html

若是不是很瞭解 Java 中線程的一些基本概念,能夠看以前這篇:java

Java讀源碼之Threadnode

案例

用一個最簡單的案例引出咱們的主角c#

public class ReentrantLockDemo {

    // 默認是非公平鎖和 synchronized 同樣
    private static ReentrantLock reentrantLock = new ReentrantLock();

    public void printThreadInfo(int num) {
        reentrantLock.lock();
        try {
            System.out.println(num + " : " + Thread.currentThread().getName());
            System.out.println(num + " : " + Thread.currentThread().toString());
        } finally {
            reentrantLock.unlock();
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        IntStream.rangeClosed(0, 3)
                .forEach(num -> executorService
                        .execute(() -> new ReentrantLockDemo().printThreadInfo(num))
                );
    }

    /**
     * 輸出:
     * 0 : pool-1-thread-1
     * 0 : Thread[pool-1-thread-1,5,main]
     * 3 : pool-1-thread-4
     * 3 : Thread[pool-1-thread-4,5,main]
     * 1 : pool-1-thread-2
     * 1 : Thread[pool-1-thread-2,5,main]
     * 2 : pool-1-thread-3
     * 2 : Thread[pool-1-thread-3,5,main]
     */

能夠看到使用起來也很簡單,並且達到了同步的效果。廢話很少說一塊兒來瞅一瞅 lock() 和 unlock() 兩個同步方法是怎麼實現的。源碼分析

源碼分析

公平鎖與非公平鎖

公平鎖顧名思義。就是每一個線程排隊搶佔鎖資源。而非公平鎖線程何時能執行更多的看緣分,例如一個線程須要執行臨界區代碼,無論以前有多少線程在等,直接去搶鎖,說白了就是插隊。對於 ReentrantLock 的實現,從構造器看出,當咱們傳入 true 表明選擇了公平鎖模式性能

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

爲何先看公平鎖實現,而不是默認的非公平鎖,由於 synchronized 就是非公平鎖,1.7開始 synchronized 的實現改變了,而且基本借鑑了 ReentrantLock 的實現,加入了自旋,偏向鎖減小系統調用,因此若是須要非公平鎖且不須要特別精細的控制,徹底沒有必要由於性能選擇 ReentrantLock 了。優化

AQS 結構

從案例中的 lock 方法進入ui

  • ReentrantLock.FairSync#lock
final void lock() {
    // 要一把鎖,向誰要鎖?
    acquire(1);
}

在繼續深刻以前讓咱們先熟悉一下 AbstractQueuedSynchronizer(AKA :AQS) 的結構this

  • 首先繼承了 AbstractOwnableSynchronizer ,主要屬性:
// 保存當前持有鎖的線程
private transient Thread exclusiveOwnerThread;
  • AQS 自身主要屬性:
// 阻塞隊列的頭
private transient volatile Node head;

// 阻塞隊列的尾
private transient volatile Node tail;

// 同步器的狀態
private volatile int state;

從 head 和 tail 能夠猜測到,AQS 應該是用一個鏈表做爲等待隊列,給等待的線程排隊, status 字段默認是0,一旦鎖被某個線程佔有就 +1,那爲啥要用int呢? 若是當前持有鎖的這個線程(exclusiveOwnerThread)還要再來把鎖,那狀態還能夠繼續 +1,也就實現了可重入。線程

  • 上面的 Node 節點長啥樣呢,不要被註釋中 CLH 鎖高大上的名稱嚇到,其實就是雙向鏈表,主要屬性:
// 標識次節點是共享模式
static final Node SHARED = new Node();
// 標識次節點是獨佔模式
static final Node EXCLUSIVE = null;
// 節點裏裝着排隊的線程
volatile Thread thread;
// 節點裏裝的線程放棄了,不搶鎖了,可能超時了,可能中斷了
static final int CANCELLED =  1;
// 下一個節點裏的線程等待被通知出隊
static final int SIGNAL    = -1;
// 節點裏裝的線程在等待執行條件,結合 Condition 使用
static final int CONDITION = -2;
// 節點狀態須要被傳播到下一個節點,主要用在共享模式
static final int PROPAGATE = -3;
// 標識節點的等待狀態,初始0,取值是上面的 -3 ~ 1
volatile int waitStatus;
// 前一個節點
volatile Node prev;
// 後一個節點
volatile Node next;
// 指向下一個等待條件 Condition
Node nextWaiter;

去掉一些普通狀況不會涉及的屬性,若是有四個線程競爭,結構以下圖所示:

能夠看到就是一個標準的頭節點爲空的雙鏈表,爲何頭節點是空?

公平鎖加鎖

  • AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
    // 若是嘗試拿鎖沒成功,那就進等待隊列
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 檢測到線程被中斷了,由於重置了中斷信號但沒作處理,再設置下中斷位,讓用戶去處理,中斷標準操做
        selfInterrupt();
}

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
  • ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 取AQS的 state 值
    int c = getState();
    // 當前沒有線程持有鎖
    if (c == 0) {
        // 若是沒有其餘線程在排隊(公平)
        if (!hasQueuedPredecessors() &&
            // 這裏可能存在競爭 CAS 試着去搶一次鎖
            compareAndSetState(0, acquires)) {
            // 搶到鎖了,把鎖持有者改爲本身,其餘線程日後稍稍
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 鎖已經被持有了,但若是鎖主人就是本身,那歡迎光臨(可重入)
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 由於其餘線程進不來,這裏不存在競爭,直接改鎖狀態
        setState(nextc);
        return true;
    }
    return false;
}
  • AbstractQueuedSynchronizer#hasQueuedPredecessors
// 返回 false 表明不須要排隊,true 表明要排隊
public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    // h == t 頭等於尾只多是剛初始的狀態或者已經沒有節點等待了
    // h.next == null ? 下面介紹進隊的過程當中,若是其餘線程與此同時 tryAcquire 成功了,會把以前的head.next置爲空,說明被捷足先登了,差一點惋惜
    // 若是到最後一個判斷了,也就是隊列中至少有一個等待節點,直接看第一個等待節點是否是本身,若是不是本身就乖乖排隊去
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
  • AbstractQueuedSynchronizer#addWaiter

tryAcquire若是沒有拿到鎖,就須要進等待隊列了,變成一個 Node 實例

// 這裏 mode 爲獨佔模式
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 若是尾節點不爲空,說明等待隊列已經初始化過
    if (pred != null) {
        node.prev = pred;
        // 嘗試把本身放到隊尾
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 進來這裏說明,等待隊列沒有被初始化過,或者嘗試失敗了
    enq(node);
    return node;
}
  • AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 若是尾節點是空,說明隊列沒有初始化
        if (t == null) {
            // 初始化一個空節點(延遲加載),head ,tail都指向它
            if (compareAndSetHead(new Node()))
                tail = head;
        } 
        // 一直嘗試把本身塞到隊尾(自旋)
        else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
  • AbstractQueuedSynchronizer#acquireQueued

addWaiter方法已經把等待線程包裝成節點放到等待隊列了,爲啥要返回中斷標識呢?主要是爲了給一些須要處理中斷的方式複用,例如 ReentrantLock#lockInterruptibly,以及帶超時的鎖

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 這邊邏輯開始繞起來了
        for (;;) {
            // 拿前一個節點
            final Node p = node.predecessor();
            // 前一個節點是head,說明本身排在第一個
            if (p == head && 
                // 在讓出cpu前再試一次,此時可能鎖持有者已經讓位了
                tryAcquire(arg)) {
                // 搶到鎖了
                setHead(node);
                // 把以前沒用的頭節點釋放
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 兩次嘗試都失敗了,只能準備被掛起,讓出cpu了(調了內核,重量級)
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 普通的鎖不處理中斷異常,不會進這個方法
        if (failed)
            cancelAcquire(node);
    }
}

private void setHead(Node node) {
    // 把頭節點設爲本身
    head = node;
    // 由於已經搶到鎖了,不須要記錄這個線程在等待了,保持了頭節點中線程永遠爲 null
    node.thread = null;
    node.prev = null;
}
  • AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 已經告訴前一個節點本身須要被通知了
        return true;
    if (ws > 0) {
        // 只有 CANCELLED 這個狀態大於0,若是前面的節點不排隊了,就一直找到一個沒 CANCELLED 的
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } 
    // 進到這裏,只剩下PROPAGATE(共享鎖時候纔有) CONDITION(本文不涉及) 和 未賦值狀態也就是0, 
    else {
        // 這裏把 默認狀態0 置爲 -1,也就表明着後面有線程在等着被喚醒了
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 返回false,就暫時不會讓線程掛起,繼續自旋,直到返回true
    return false;
}
  • AbstractQueuedSynchronizer#parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
    // 掛起,標準用法this充當blocker 
    LockSupport.park(this);
    // 一旦恢復,返回線程在掛起階段是否被中斷,此方法會重置中斷位
    return Thread.interrupted();
}

到這裏加鎖流程就介紹差很少了,用一個最簡單流程的圖來總結一下:

公平鎖解鎖

  • AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
    // 嘗試釋放鎖
    if (tryRelease(arg)) {
        Node h = head;
        // 若是等待隊列已經被初始化過,而且後面有節點等待操做
        if (h != null && h.waitStatus != 0)
            // 恢復掛起的線程
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  • ReentrantLock.FairSync#tryRelease
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 能執行釋放鎖的確定是鎖的持有者,除非虛擬機魔怔了
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 考慮可重入
    if (c == 0) {
        free = true;
        // 鎖如今沒有持有者了
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
  • ReentrantLock.FairSync#unparkSuccessor
// node 是頭節點
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 若是狀態不是 CANCELED,就把狀態置爲初始狀態 
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    // s == null 這個條件成立主要是在共享模式下自旋釋放。
    if (s == null || s.waitStatus > 0) {
        // 把 CANCELED 狀態的節點置爲空
        s = null;
        // 由於 head 這條路已經斷了,從尾巴開始找到第一個排隊的節點,而後把隊列接上
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 把第一個排隊的節點中的線程喚醒,
        LockSupport.unpark(s.thread);
}

線程從加鎖代碼裏介紹的 AbstractQueuedSynchronizer#parkAndCheckInterrupt 方法中醒來,繼續自旋拿鎖。若是此時後面還有人排隊就必定能拿到鎖了。如圖所示:

非公平鎖加鎖

  • ReentrantLock.NonfairSync#lock
final void lock() {
    // 無論三七二十一,直接搶鎖,若是運氣好,鎖正好被釋放了,就不排隊了
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 和上面介紹的公平鎖同樣,只是 tryAcquire 實現不同
        acquire(1);
}
  • ReentrantLock.Sync#nonfairTryAcquire

上面公平鎖咱們已經知道,線程真正掛起前會嘗試兩次,因爲不考慮別人有沒有入隊,實現很是簡單

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // 若是沒有線程持有鎖,直接搶鎖
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 若是是重入,狀態累加
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

非公平鎖解鎖

由於都是獨佔鎖模式,解鎖和公平鎖邏輯同樣。

總結

至此,總算看完了 ReentrantLock 常規的加鎖解鎖源碼,好好體會下 AQS 的結構,仍是能看懂的,且頗有收穫,總之 Doug Lea 大神牛B。

本文仍是挖了不少坑的:

  • 帶超時的鎖是如何實現的?
  • 檢測中斷的鎖是如何實現的?
  • 各類 Condition 是如何實現的?
  • 鎖的共享模式是如何實現的?

之後有時間再一探究竟吧。

相關文章
相關標籤/搜索