從ReentrantLock分析AbstractQueuedSynchronized源碼

1.示例代碼

ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();

2.ReentrantLock構造方法

private final Sync sync;

//空構造的狀況建立一個非公平鎖
public ReentrantLock() {
    sync = new NonfairSync();
}

//傳boolean值,true的狀況建立一個公平鎖,false建立一個非公平鎖
public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
}

3.Sync,FairSync,AbstractQueuedSynchronizer繼承關係

AbstractQueuedSynchronizer提供了通用方法,但其tryAcquire方法須要子類FairSync本身實現node

4.Node節點

AbstractQueuedSynchronized內部維護了一個雙向鏈表,經過waitStatus表示當前線程等待狀態多線程

具體結構以下圖:測試

阻塞隊列不包含 head 節點,這裏要先有一個概念,爲何呢?由於首節點是一個虛節點的概念,不存儲數據,從addWaiter方法中的enq方法內能夠看出來。在添加一個節點進隊列的時候,第一次添加的時候會構建一個空節點。ui

private Node enq(final Node node) {
    for (;;) {
        //獲取當前尾節點
        Node t = tail;
        //若是當前尾節點爲空
        if (t == null) { 
            //構造一個空節點並賦值給head節點
            if (compareAndSetHead(new Node()))
                //將尾節點指向當前空的首節點
                tail = head;
        } else {
            //循環第二次進來,尾節點一點不爲空
            //設置當前節點的前置節點爲以前的尾節點
            node.prev = t;
            //設置當前節點爲尾節點
            if (compareAndSetTail(t, node)) {
                //設置初始化爲空的尾節點的後繼節點爲當前節點
                t.next = node;
                //返回空節點
                return t;
            }
        }
    }
}
static final class Node {
 //表明節點當前在共享模式下
    static final Node SHARED = new Node();
 //表明當前節點在獨佔模式下
    static final Node EXCLUSIVE = null;

    //當前節點線程取消爭搶鎖
    static final int CANCELLED =  1;
    //當前node節點的後繼節點的線程須要被喚醒
    static final int SIGNAL    = -1;
    //codition隊列節點屬性
    static final int CONDITION = -2;
    //這裏不討論
    static final int PROPAGATE = -3;

    //取值爲上面的1,-1,-2,-3或者默認值0
    volatile int waitStatus;

    //前驅節點
    volatile Node prev;
    
    //後繼節點
    volatile Node next;
    
    //當前node節點對應的線程
    volatile Thread thread;
    
    //
    Node nextWaiter;
    
    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    
    //獲取隊列的首節點
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    
    Node() {    // Used to establish initial head or SHARED marker
    }
    
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }

}

5.state屬性

state值表明當前線程是否獲取鎖this

//AbstractQueuedSynchronizer的核心屬性,大於0說明線程持有鎖,每重入一次該值+1
//所謂的重入能夠理解爲lock.lock()調用屢次
private volatile int state;

//父類AbstractQueuedSynchronizer的方法
//經過UNSAFE類設置內存地址中state的屬性值,當state>=1的時候,說明當前線程持有鎖
//線程每進行一次重入,state值會+1
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

6.從ReentrantLock公平鎖分析AbstractQueuedSynchronized

公平鎖和非公平鎖的區別在於,非公平鎖在入隊以前,進行了2次CAS操做進行state值的設置,而公平鎖只在tryAcquire中首先進行了hasQueuedPredecessors的判斷以後,才進行了CAS操做,以後二者的邏輯同樣,競爭激烈的時候線程都會被構形成Node節點進入阻塞隊列。spa

加鎖過程方法調用圖,方便分析代碼調用過程線程

6.1.加鎖過程分析

6.1.1.AbstractQueuedSynchronized.acquire方法分析

//加鎖核心方法入口
public final void acquire(int arg) {
    //條件1:FairSync.tryAcquire嘗試獲取鎖,獲取成功返回true,失敗返回false
    //條件2:addWaiter將當前線程封裝成Node加入到雙向隊列中
    //acquireQueued爲核心方法,包含阻塞當前線程,清除隊列中CANCELED狀態線程
    //返回true表示
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();//設置當前線程的中斷狀態,這裏涉及interrupt()和interrupted()方法的概念
}

interrupt()只設置線程的中斷狀態爲true,並不能中斷線程;blog

interrupted()測試當前線程是否已經被中斷,並清除線程的中斷狀態;繼承

6.1.2.FairSync.tryAcquire方法分析

aqs將tryAcquire交給子類去實現,這裏示例爲FairSync隊列

//嘗試獲取鎖
protected final boolean tryAcquire(int acquires) {
    //獲取當前線程
    final Thread current = Thread.currentThread();
    //獲取當前線程狀態
    int c = getState();
    //c==0說明當前線程還未獲取鎖
    if (c == 0) {
        //條件1:判斷當前隊列中是否有其餘線程在等待,沒有則返回false
        //條件2:條件1中隊列中沒有其餘線程在等待,嘗試修改state值,修改爲功則表明加鎖成功
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            //設置當前線程爲獨佔線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //c>0說當前線程已經獲取鎖,判斷當前線程和獨佔線程是否相等
    else if (current == getExclusiveOwnerThread()) {
        //將state的值遞增
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        //更新state的值
        setState(nextc);
        return true;
    }
    return false;
}

6.1.3.AbstractQueuedSynchronized.hasQueuedPredecessors方法分析

FairSync.tryAcquire裏面面有一個hasQueuedPredecessors方法,這是和非公平鎖的區別,公平鎖這裏多了這個方法。用於判斷隊列中是否已經有其餘線程在等待,若是沒有,則返回false。由於前面也已經說過head節點不存儲數據,只是一個虛節點,因此判斷隊列中是否有處於等待的節點有如下判斷:

1.h != t,若是head和tail相同,也就不用判斷了,確定沒有處於等待的節點。

2.(s = h.next) == null說明只有首節點,確定沒有處於等待的節點。

3.s.thread != Thread.currentThread(),首節點的next節點的線程不和當前線程同樣。若是相同,說明當前首節點只有一個next節點,也就是隻有一個線程競爭鎖資源,能夠直接經過CAS競爭鎖資源。

這裏有個細節,tail的聲明在head以前,由於根據tail你必定能夠獲取head,可是反過來有head就不必定有tail了。由於head確定是在tail以前初始化的。這樣在多線程競爭狀況下,若是head先聲明,tail後聲明,就會出現head初始化了但tail還未初始化的過程,使得h!=t等式成立。

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

6.1.4.AbstractQueuedSynchronized.addWaiter方法分析

//將當前線程封裝成Node節點並返回,指定爲獨佔模式,則節點的nextWaiter爲NULL
//若是當前節點存在尾節點,則將當前節點加入到隊列中,並設置當前節點爲尾節點
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    //將節點加入到隊尾當中
    Node pred = tail;
    //若是尾節點不爲空
    if (pred != null) {
        //設置當前節點的前驅節點爲尾節點
        node.prev = pred;
        //經過cas方式設置當前節點爲尾節點
        if (compareAndSetTail(pred, node)) {
            //以前尾節點的後續節點爲當前節點
            pred.next = node;
            return node;
        }
    }
    //兩種狀況走到這裏:1.尾節點爲空,則隊列爲空;2.經過CAS設置尾節點失敗,存在競爭的時候可能出現
    //自旋方式設置當前節點爲尾節點,若是尾節點爲空,則初始化一個空節點做爲首節點,並將當前節點和空的首
    //節點構成雙向隊列,返回該空節點
    enq(node);
    return node;
}

6.1.5.AbstractQueuedSynchronized.enq方法分析

自旋方式設置當前節點爲尾節點,若是尾節點爲空,則初始化一個空節點做爲首節點,並將當前節點和空的首節點構成雙向隊列,返回該空節點

private Node enq(final Node node) {
    for (;;) {
        //獲取當前尾節點
        Node t = tail;
        //若是當前尾節點爲空
        if (t == null) { 
            //構造一個空節點並賦值給head節點
            if (compareAndSetHead(new Node()))
                //將尾節點指向當前空的首節點
                tail = head;
        } else {
            //循環第二次進來,尾節點一點不爲空
            //設置當前節點的前置節點爲以前的尾節點
            node.prev = t;
            //設置當前節點爲尾節點
            if (compareAndSetTail(t, node)) {
                //設置初始化爲空的尾節點的後繼節點爲當前節點
                t.next = node;
                //返回空節點
                return t;
            }
        }
    }
}

6.1.6.AbstractQueuedSynchronized.acquireQueued方法分析


final boolean acquireQueued(final Node node, int arg) {
    //失敗標識
    boolean failed = true;
    try {
        //中斷標識
        boolean interrupted = false;
        //自旋
        for (;;) {
            //獲取當前節點前驅節點
            final Node p = node.predecessor();
            //若是當前節點爲前驅節點爲首節點,嘗試加鎖
            if (p == head && tryAcquire(arg)) {
                //加鎖成功則設置當前節點爲首節點
                setHead(node);
                //以前頭節點的後續節點設置爲空,幫助gc
                p.next = null; // help GC
                //失敗標識設置爲false
                failed = false;
                return interrupted;
            }
            //若是節點的前驅節點不是頭節點或者嘗試加鎖失敗
            //shouldParkAfterFailedAcquire判斷當前節點線程是否須要阻塞
            //當能阻塞當前線程時,調用parkAndCheckInterrupt方法阻塞線程
            //若是被阻塞,當前自旋操做也走不下去了
            //線程被阻塞在自旋的這裏等待喚醒,這裏是很是須要注意的,釋放鎖後,喚醒的下一個節點的
            //代碼會當即運行到這一段,而後嘗試加鎖並把本身設置爲首節點,這樣就完成了首節點的變動
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //若是阻塞過程發生異常或其餘緣由致使失敗,將當前線程狀態設置爲CANCELED,同時從隊列中剔除
        if (failed)
            cancelAcquire(node);
    }
}

6.1.7.AbstractQueuedSynchronized.shouldParkAfterFailedAcquire方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //獲取當前節點前驅節點的狀態
    int ws = pred.waitStatus;
    //若是前驅節點狀態爲SIGNAL,當前節點線程能夠直接阻塞
    if (ws == Node.SIGNAL)
        return true;
    //若是前驅節點的狀態爲CANCELLED,從當前節點向前循環獲取第一個waitStatus不爲CANCELLED的節點,
    //並將找到的前驅節點和當前節點構成雙向隊列,剔除CANCELLED狀態的節點
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {//若是ws=0,設置當前節點的前驅節點的狀態爲SIGNAL
        //經過cas設置前驅節點的狀態爲SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

6.1.8.AbstractQueuedSynchronized.parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() {
    //阻塞當前線程,當前線程已被中斷
    LockSupport.park(this);
    //清除當前線程的中斷狀態並測試線程是否已被中斷,這裏返回true表示線程已經被中斷
    //雖然這裏清除了線程的中斷狀態,但acquire方法裏面調用了selfInterrupt()設置了線程的中斷狀態
    return Thread.interrupted();
}

6.1.9.AbstractQueuedSynchronized.cancelAcquire方法

取消加鎖操做,將當前Node從隊列中移除,構建新的隊列,移除構建新的隊列過程當中遍歷到的CANCELED狀態的節點,根據當前節點所處位置分析是否須要喚醒當前節點的後續節點。

這段代碼首先幹了這幾件事:

首先將當前節點的線程設置爲空,狀態設置爲CANCELED,從後向前找到當前節點第一個狀態不爲CANCELED的前驅節點,並把當前節點的前驅節點設置爲找到的這個節點。

再來就是構建新的雙向隊列,根據當前節點所處的位置有三種狀況:

(1).當前節點是尾節點。這種狀況咱們要作什麼呢?前面咱們已經從後向前獲取到了當前節點的前驅節點中第一個不爲CANCELED狀態的節點,那麼是否就須要將全部的CANCELED狀態的節點和當前節點剔除呢?沒錯,這裏面就作了這件事。把找到的前驅節點設置爲尾節點,把找到的前驅節點的後續節點設置爲空,這樣就造成了新的雙向隊列。

(2).當前節點不是尾節點,也不是head的後繼節點,對應的就是這個條件判斷pred != head。這種狀況節點就處於中間了。

相似結構:head<->node1<->node2<->...<->pred(searchNode)<->...(canceled狀態節點)<->currentNode<->next<->...<->tail,...(canceled狀態節點)<->currentNode這之間的節點都須要剔除,而後將pred(searchNode)<->next構成新的鏈表。

那麼這裏是否還須要將找到的pred節點的狀態設置爲SIGNAL呢,表明後續節點須要被喚醒。

(3).當前節點是head節點的後繼節點,須要喚醒當前節點的後繼節點,具體方法在unparkSuccessor中

private void cancelAcquire(Node node) {
    if (node == null)
        return;
 //將當前節點線程設置爲空
    node.thread = null;
 //獲取當前節點的前驅節點
    Node pred = node.prev;
    //從後向前循環,找到當前節點的前驅節點中第一個不爲CANCELED狀態的節點
    //並設置當前節點的前驅節點爲找到的節點
    while (pred.waitStatus > 0)
        //node.prev = pred->設置當前節點的前驅節點
        //pred = pred.prev
        node.prev = pred = pred.prev;

    //獲取找到的前驅節點的後續節點
    //多是CANCELLED狀態的節點,也多是當前節點本身
    Node predNext = pred.next;

    //設置當前節點的狀態爲CANCELLED
    node.waitStatus = Node.CANCELLED;

    //(1)
    //1.若是當前節點是尾節點
    //則設置找到的當前節點不爲CANCELLED狀態的節點爲尾節點,爲CANCELLED狀態的節點能夠從隊列中剔除了
    if (node == tail && compareAndSetTail(node, pred)) {
        //同時設置找到的當前節點不爲CANCELLED狀態的前驅節點的尾節點爲空,剔除隊列中CANCELLED狀態的節點
        compareAndSetNext(pred, predNext, null);
    } else {
        //(2)
        int ws;
        //2.當前節點不是head的next節點也不是尾節點
        //條件1:當前節點不是head的next節點也不是尾節點
        //條件2.1:(ws = pred.waitStatus) == Node.SIGNAL,說明當前節點找到的前驅節點狀態是SIGNAL
        //若是不是SIGNAL,也可能爲0
        //條件2.2:ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
        //若是前驅節點的狀態<=0,則設置找到的前驅節點的狀態爲SIGNAL,表示須要喚醒後繼節點
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            //獲取當前節點的後續節點
            Node next = node.next;
            //若是當前節點後續節點不爲空且狀態<=0
            if (next != null && next.waitStatus <= 0)
                //設置當前節點找到的前驅節點的後繼節點爲當前節點的next節點,剔除CANCELLED狀態的節點
                compareAndSetNext(pred, predNext, next);
        } else {
            //(3)
            //當前節點是head的next節點,須要喚醒當前節點的後繼節點,具體方法在unparkSuccessor中
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

6.1.10.AbstractQueuedSynchronized.unparkSuccessor方法

private void unparkSuccessor(Node node) {
 //獲取當前節點狀態
    int ws = node.waitStatus;
    //若是狀態是<0,則設置當前節點狀態爲0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
 //若是ws > 0
    //獲取當前節點的後續節點
    Node s = node.next;
    //若是後續節點空或者狀態爲CANCELED
    if (s == null || s.waitStatus > 0) {
        s = null;
        //從後向前找到最靠近當前節點的狀態<1的節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //若是找到了這個節點,解除線程的阻塞
    if (s != null)
        LockSupport.unpark(s.thread);
}

6.2.釋放鎖過程分析

lock.unlock();

這段代碼主要調了AbstractQueuedSynchronized的release方法。

6.2.1.分析AbstractQueuedSynchronized的release方法

public final boolean release(int arg) {
    //嘗試釋放鎖,當state=0的時候返回true
    if (tryRelease(arg)) {
        Node h = head;
        //當首節點不爲空且首節點狀態不爲初始化的狀態的狀況,喚醒後續節點
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//遞減c的值,該方法也是aqs交給子類實現的,這裏的具體實現是在Sync當中
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //當c的值遞減爲0的時候
    if (c == 0) {
        free = true;
        //設置獨佔線程爲空
        setExclusiveOwnerThread(null);
    }
    //設置state的值爲0
    setState(c);
    return free;
}

6.2.2.釋放鎖變動首節點的邏輯和next節點加鎖邏輯

acquireQueued方法裏面,全部處於阻塞過程當中的線程的代碼都卡在了自旋的這裏,若是當前首節點釋放鎖,當前首節點的next節點當即從自旋的代碼這裏恢復,並嘗試加鎖並把本身變成首節點,這樣就完成了已執行完的節點的出隊操做。並且本身成爲首節點執行完成釋放鎖時,也把本身的線程設置爲空了,而後繼續喚醒下一個節點,下一個節點的線程卡在自旋代碼塊中,而後解除阻塞繼續執行。

//acquireQueued方法的自旋邏輯,全部被阻塞的節點的線程都會被卡在這裏等待喚醒
for (;;) {
    //獲取當前節點前驅節點
    final Node p = node.predecessor();
    //若是當前節點爲前驅節點爲首節點,嘗試加鎖
    if (p == head && tryAcquire(arg)) {
        //加鎖成功則設置當前節點爲首節點
        setHead(node);
        //以前頭節點的後續節點設置爲空,幫助gc
        p.next = null; // help GC
        //失敗標識設置爲false
        failed = false;
        return interrupted;
    }
    //若是節點的前驅節點不是頭節點或者嘗試加鎖失敗
    //shouldParkAfterFailedAcquire判斷當前節點線程是否須要阻塞
    //當能阻塞當前線程時,調用parkAndCheckInterrupt方法阻塞線程
    //若是被阻塞,當前自旋操做也走不下去了
    //線程被阻塞在自旋的這裏等待喚醒,這裏是很是須要注意的,釋放鎖後,喚醒的下一個節點的
    //代碼會當即運行到這一段,而後嘗試加鎖並把本身設置爲首節點,這樣就完成了首節點的變動
    if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
}
相關文章
相關標籤/搜索