追蹤解析 ReentrantLock 源碼

零 前期準備

0 FBI WARNING

文章異常囉嗦且繞彎。java

1 版本

JDK 版本 : OpenJDK 11.0.1node

IDE : idea 2018.3併發

2 ReentrantLock 簡介

ReentrantLock 是 jdk 中經典的高性能重用鎖,做爲基礎組件常常能在 jdk 的其它併發框架中看到。框架

筆者但願可以經過此次代碼閱讀弄懂 AbstractQueueSynchronizer 和 ReentrantLock 的基本構造。ide

本文特指非公平鎖的代碼實現,對於公平鎖暫不作詳解。工具

注意,ReentrantLock 在 jdk11 中相比 jdk8 (這樣作比較是由於 jdk8 是目前工做中最經常使用的版本) 有了一些代碼上的改變,筆者認爲代碼更加精簡了,可是具體性能沒有作過實戰測試和系統比較。性能

3 Demo

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {
    //建立一個 lock
    private ReentrantLock lock = new ReentrantLock();

    public void demo(){
        //上鎖
        lock.lock();
        //打印當前線程的線程名
        System.out.println(Thread.currentThread().getName());
        //讓線程休眠十秒,在此期間線程不會交出鎖,因此其它調用該方法的線程都會阻塞
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) { }
        //釋放鎖
        lock.unlock();
    }

    //main 方法
    public static void main(String[] args) {
        
        LockDemo demo = new LockDemo();
        //建立兩個線程來調用同一個 demo 實例對象,就能看出鎖的做用了
        new Thread(new LockDemoRunner(demo)).start();
        new Thread(new LockDemoRunner(demo)).start();

    }
}

//Runnable 實現類,用於建立線程對象
class LockDemoRunner implements Runnable{
    //多個線程對象公用一個 demo 的實例
    private LockDemo lockDemo;
    //構造器
    LockDemoRunner(LockDemo demo){ lockDemo = demo;}
    @Override
    public void run() {
        lockDemo.demo();
    }
}

一 Sync

先來看一下 ReentrantLock 的默認構造器:學習

//ReentrantLock.class
public ReentrantLock() {
    sync = new NonfairSync();
}

再來看一下 ReentrantLock 的加鎖和解鎖方法:測試

//ReentrantLock.class
public void lock() {
    sync.acquire(1);
}

//ReentrantLock.class
public void unlock() {
    sync.release(1);
}

能夠發現,ReentrantLock 的邏輯主要由 sync 對象實現。ui

而 sync 則是一個 NonfairSync 類型的對象。NonfaireSync 是 ReentrantLock 的靜態內部類:

//ReentrantLock.class
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    
    //此方法用於嘗試去獲取鎖
    //tryAcquire(...) 原本是 AbstractQueuedSynchronizer 中的方法,此處爲重寫
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

其實 NonfairSync 的主要邏輯在其父類 Sync 中實現。Sync 一樣也是 ReentrantLock 的靜態內部類。

Sync 中比較重要的是兩個分別用於加解鎖的方法:

//Sync.class
//此方法用於加鎖
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
    //獲取當前線程的線程實例對象
    final Thread current = Thread.currentThread();
    //獲取鎖的狀態
    //c 的初始狀態值爲 0,意思是還未上鎖
    int c = getState();
    if (c == 0) {
        //compareAndSetState(...) 方法是 AbstractQueuedSynchronizer 中很是重要的方法
        //用以更新鎖狀態
        //此處的 acquires = 1
        if (compareAndSetState(0, acquires)) {
            //保存線程對象
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        //若是當前線程就是正在執行的線程,則將目前的狀態值 status 與 acquires 相加,保存爲新的 status
        int nextc = c + acquires;
        //邏輯上此處是不會爲負數的,只用於嚴謹邏輯
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

另外一個方法:

//Sync.class
//此方法用於嘗試解鎖
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    //若是當前線程並非正在執行的線程,則沒有權限去解鎖,會直接報錯
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        //鎖解開以後將保存的當前線程對象置空
        setExclusiveOwnerThread(null);
    }
    //此處更新狀態值
    setState(c);
    return free;
}

Sync 中的其它方法主要用於狀態判斷,都比較簡單,再也不累述。

二 AbstractQueuedSynchronizer

AbstractQueuedSynchronizer 是 jdk 中用來構建同步組件的框架類,是 Sync 的父類。

以前解析 ThreadPoolExecutor 的內部類 Worker 的時候也初步瞭解過。

在上述探索 Sync 的過程當中用到了不少此類中的方法。因此對用到的方法作一個追蹤。

1 status

AbstractQueuedSynchronizer 中最重要的一個狀態控制變量是 status,它表明鎖目前是被使用仍是空閒:

//AbstractQueuedSynchronizer.class
private volatile int state;

獲取和存入 status 的值:

//AbstractQueuedSynchronizer.class
protected final int getState() {
    return state;
}
//AbstractQueuedSynchronizer.class
protected final void setState(int newState) {
    state = newState;
}

以上的 set/get 方法都很簡單和常規,可是實際上 status 的存值主要使用該類中的另外一個方法:

//AbstractQueuedSynchronizer.class
protected final boolean compareAndSetState(int expect, int update) {
    //compareAndSet(...) 這個方法在使用 AtomicInteger 的時候接觸過,可是此處的 STATE 並非 AtomicInteger
    //STATE 是一個定義在 AbstractQueuedSynchronizer 中的 VarHandle 類型的變量
    //VarHandle 是 jdk9 中新增的一個併發工具,目前網上對此工具的分析比較少
    //基本能判斷的是,此工具的做用與 AtomicXXXX 工具類很相似,能提供原子化的操做,並在必定程度上替代 Unsafe

    //此方法用於先比較該實例對象中的 status 是否與第二個參數的值相等,若是是,則將 status 替換成第三個參數的值,返回 true
    //若是不相等,就不進行替換,並返回 false
    return STATE.compareAndSet(this, expect, update);
}

VarHandler 的實例化也比較神奇,能夠作一下了解:

//定義的是 static 對象,能夠被反覆利用,而不是 AtomicXXXX 的模式了
private static final VarHandle STATE;
private static final VarHandle HEAD;
private static final VarHandle TAIL;

static {
    try {
        //VarHandler 的實例化工廠
        MethodHandles.Lookup l = MethodHandles.lookup();
        //實例化的時候將類 class、參數的名稱、參數的類型 傳入,就能夠建立一個指向此參數的 VarHandler 實例對象了
        STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
        HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
        TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
    } catch (ReflectiveOperationException e) {
        throw new ExceptionInInitializerError(e);
    }
    //LockSupport 是一個服務於鎖的靜態工具類
    Class<?> ensureLoaded = LockSupport.class;
}

2 Node

AbstractQueuedSynchronizer 有一個靜態內部類 Node,本質上是一個雙向鏈表的節點對象。

同時 AbstractQueuedSynchronizer 還有兩個 Node 節點對象:

//雙向鏈表的頭結點
private transient volatile Node head;
//雙向鏈表的尾節點
private transient volatile Node tail;

每一個 Node 節點對象的內部儲存有一個 Thread 對象,即爲等待執行的線程的實例化對象:

//Node.class
//此爲 Node 的構造方法,傳入的 nextWaiter 即爲該節點的後一個節點
Node(Node nextWaiter) {
    this.nextWaiter = nextWaiter;
    //THREAD 是一個 VarHandler 對象,用於將線程對象存入當前節點中
    //注意,當前線程被存入了當前的 Node,而不是下一個 Node
    THREAD.set(this, Thread.currentThread());
}

再來看一個增長 Node 的方法:

//AbstractQueuedSynchronizer.class
private Node addWaiter(Node mode) {
    //此時當前線程對象已經被存入 node 對象中
    Node node = new Node(mode);
    
    //比較見名知意,因此不展開細講了,只稍微說起
    //這個 for 循環用於將 node 對象添加到鏈表的尾部,代替掉以前的 tail 對象
    //有一種特殊狀況,即 oldTail 是 null,則表明着該鏈表實際上是空的,沒有任何節點
    //這種狀況下調用 initializeSyncQueue() 方法初始化鏈表,即此時 node 對象既是頭節點也是尾節點
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

3 lock

來看 ReentrantLock 的加鎖方法:

//ReentrantLock.class
public void lock() {
    sync.acquire(1);
}

acquire(...) 方法在 AbstractQueuedSynchronizer 中實現:

//AbstractQueuedSynchronizer.class
public final void acquire(int arg) {
    //此處的判斷條件處,會先調用 tryAcquire(...) 方法去嘗試獲取鎖的使用權
    //若是獲取成功,此處 tryAcquire(...) 方法會返回 true,那麼 !tryAcquire(...) = false,此方法會直接結束
    //若是獲取失敗,此時 !tryAcquire(...) = true,進入 acquireQueued(...) 方法
    //在執行 acquireQueued(...) 方法以前,會先執行 addWaiter(...) 方法,此時當前線程已經被存入尾部節點中
    //Node.EXCLUSIVE 是一個 null 對象
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //selfInterrupt() 會中斷當前線程,使得線程處於等待被喚醒的狀態
        selfInterrupt();
}

再來追蹤一下 acquireQueued(...) 方法:

//AbstractQueuedSynchronizer.class
final boolean acquireQueued(final Node node, int arg) {
    //在本例中此時傳入的 node 是鏈表的尾節點,且是存儲了當前線程的節點對象
    //arg = 1

    boolean interrupted = false;
    try {
        for (;;) {
            //獲取當前節點的前一個節點
            final Node p = node.predecessor();
            //若是 p 節點是頭節點,且當前線程嘗試獲取鎖的使用權成功
            //則讓當前節點成爲頭節點,並刪去原先的頭節點(即 p 節點)
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                return interrupted;
            }

            //shouldParkAfterFailedAcquire(...) 方法會根據 node 的前一節點的狀態來判斷該節點是否要被掛起或者喚醒
            //parkAndCheckInterrupt(...) 內部會調用 unsafe 的相關方法掛起節點
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

因而可知,當前節點 node 的最終處理方式是由其的前一節點的狀態來肯定的。Node 內置了幾種狀態值:

//CANCELLED 表明該節點處於取消狀態,該節點不會執行了
static final int CANCELLED =  1;
//SIGNAL 表明該節點的下一節點處於阻塞狀態,會以後被執行
static final int SIGNAL    = -1;
//CONDITION 表明該節點處於阻塞狀態
static final int CONDITION = -2;
//PROPAGATE 表明共享狀態
static final int PROPAGATE = -3;
//還有一種狀態 0,即爲節點的初始狀態

瞭解了狀態以後再來看 shouldParkAfterFailedAcquire(...) 方法:

//AbstractQueuedSynchronizer.class
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //獲取 node 節點的前節點的狀態
    int ws = pred.waitStatus;
    //若是是 SIGNAL,直接返回 true,而後 node 節點會進入被掛起
    if (ws == Node.SIGNAL)
        return true;

    //狀態值大於 0,只多是 CANCELLED 狀態,即此節點已經被廢棄了
    if (ws > 0) {
        //不斷往前遍歷,將中間被廢棄的節點所有剔除出鏈表中
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //若是狀態值並不是 CANCELLED 或者 SIGNAL,在這裏會將狀態值修改爲 SIGNAL
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    //只要 pred 的狀態值不爲 SIGNAL,都會返回 false
    return false;
}

shouldParkAfterFailedAcquire(...) 方法和 parkAndCheckInterrupt(...) 方法共同做用,對不符合的節點進行剔除,對符合要求的節點進行掛起操做。

這樣一來節點所封裝的線程也就進入了阻塞隊列中,等待被鎖喚醒。

4 unlock

回到 ReentrantLock 的解鎖代碼:

//ReentrantLock.class
public void unlock() {
    sync.release(1);
}

release(...) 在 AbstractQueuedSynchronizer 中實現:

//AbstractQueuedSynchronizer.class
public final boolean release(int arg) {
    //此處的 tryRelease(...) 是 Sync 中重寫以後的方法,具體看上述 Sync 的實現
    //此方法修改了當前鎖的狀態值
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

代碼和上一 part 的比較相似,重點來看一下 unparkSuccessor(...) 方法:

//AbstractQueuedSynchronizer.class
private void unparkSuccessor(Node node) {

    //獲取 node 的狀態值
    int ws = node.waitStatus;
    //若是狀態值爲 SIGNAL、CONDITION、PROPAGATE 的話就會轉成 0
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    //獲取 node 的下一節點 s
    Node s = node.next;
    //若是 s 節點不存在,或者已經被廢棄了,就會一直輪詢,找到一個符合條件的
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    //將 s 節點激活
    //注意此處的 s 節點爲 node 的下一個可以被使用的節點
    if (s != null)
        LockSupport.unpark(s.thread);
}

unparkSuccessor(...) 方法的核心是選擇符合要求的下一節點,並將其所表明的線程對象從掛起狀態喚醒。

注意,此處展現的是非公平鎖的邏輯。非公平鎖講求先到先得,會依次喚醒線程並執行代碼塊。公平鎖則一次性喚醒因此線程並進行一次公平爭奪。

三 LockSupport

1 park

LockSupport 的 park(...) 方法用於掛起線程:

//LockSupport.class
public static void park(Object blocker) {
    //獲取當前線程對象
    Thread t = Thread.currentThread();
    //blocker 在本例中即爲鎖對象自己,setBlocker(...) 方法主要是作一下記錄,此線程是被誰阻塞了
    setBlocker(t, blocker);
    //掛起線程
    U.park(false, 0L);
    //當線程執行到這句代碼的時候,說明線程已經從掛起狀態被喚醒了
    //因此這裏能夠清空掉記錄
    setBlocker(t, null);
}

繼續追蹤 setBlocker(...) 方法:

//LockSupport.class
private static void setBlocker(Thread t, Object arg) {
    //putObject(...) 方法會把一個對象存到指定的地址處
    //此例的 t 是當前的線程對象,PARKBLOCKER 是一個 long 類型的內存地址,arg 是鎖對象自己
    U.putObject(t, PARKBLOCKER, arg);
}

能夠看到,park(...) 方法和 setBlocker(...) 方法都是調用了 U 對象的相關方法。

U 對象是一個 Unsafe 實例:

//LockSupport.class
private static final Unsafe U = Unsafe.getUnsafe();

Unsafe 的 park(...) 方法用於掛起線程,putObject(...) 方法用於存入對象。

2 unpark

//LockSupport.class
public static void unpark(Thread thread) {
    //喚醒線程
    if (thread != null)
        U.unpark(thread);
}

四 一點嘮叨

總結一下重入鎖的業務邏輯:

1 當第一個線程進入到鎖區域(即調用 lock() 方法)的時候,會被鎖記錄爲當前線程,而且修改鎖的狀態值

2 當有其它線程進入到該代碼塊,可是鎖的狀態值並非初始值(即以前的線程未釋放鎖資源)的時候,線程會被封裝成節點並存入到鏈表的末尾,而後被掛起

3 以前的線程釋放鎖資源(即調用 unlock() 方法)的時候,鎖會去遍歷內部的鏈表,喚醒下一個符合要求的線程(特指非公平鎖)

本文僅爲我的的學習筆記,可能存在錯誤或者表述不清的地方,有緣補充。

相關文章
相關標籤/搜索