JDK核心源碼之ReenrantLocak源碼註釋

 

        AQS的java的同窗面試常常被問到的一個問題。不少同窗被面到這個問題的時候,一臉蒙圈。可是說實話,這個AQS對於java的同窗來講應該是一個比較重要的知識,由於咱們不少併發的對象都是基於這個實現的,因此考察java同窗的併發知識的功底,問這個AQS也是一個質量比較好的問題。java

         AQS的全稱是AbstractQueuedSynchronizer(文中就叫:抽象隊列),說白了其實就是一個抽象類。node

是咱們併發包裏面的基石,重入鎖,讀寫鎖,不少併發的工具都是基於它實現的。因此理解好AQS是什麼東西對於掌握好併發知識是有幫助的。面試

     不過,咱們若是分析AQS的時候,直接就讀AQS的代碼有點無聊,咱們就分析ReentantLock的源碼,由於咱們平時真正使用的時候咱們使用的是ReentantLock,而不是AQS。可是ReentantLock就是基於AQS實現的。併發

     在分析AQS以前,我先給你們解析一下AQS的原理。以下圖所示:工具

圖展現的一個ReentantLocak(基於非公平鎖分析)加鎖的過程。ui

  1. 首先是線程一過來了,state的初始值是0,當前線程爲null。由於歷來沒有人加過鎖,因此線程一很容易就加鎖成功了。加鎖成功了就是把state0變爲1,當前線程設置爲本身的名字,好比就是線程1.
  2. 接下來,線程一再次進來進行重入加鎖,發現當前的state1,可是當前線程是本身,那麼直接就讓state1,由1變爲2,這樣就完成了可重入加鎖。
  3. 接着線程二進來了,線程二進來發現state2,可是當前線程不是本身,因此加鎖就失敗了。

加鎖失敗了之後線程二就加入到隊列裏面去(AQS內部實現了一個雙向隊列)this

  1. 再接着線程三就進來了,遇到的情況跟線程二同樣,因此跟着也就到了隊列裏面了。

 

AQS重入加鎖大概就是這麼個原理。咱們接下來分析一下它的底層源碼。我就以ReentantLocak爲例寫個例子。spa

public class ReentrantLockDemo {
    ReentrantLock reentrantLock = new ReentrantLock();
    int sum=0;
    public void count(){
//這個是加鎖的代碼
//咱們這次主要基於非公平鎖分析源碼,等到合適的時機再給你們
//解釋公平鎖和非公平鎖的區別。
//首先咱們開始分析lock的方法。
        reentrantLock.lock();
        for (int i = 0; i < 10; i++) {
            sum++;
            System.out.println(sum);
        }
//這個是咱們釋放鎖的代碼
        reentrantLock.unlock();
    }
    }
}
final void lock() {
//線程一第一次進來之間使用cas操做修改state的值
//這句代碼的語義就是當前state的值是否爲0,若是是0,那麼就把修改成1。
    if (compareAndSetState(0, 1))
//設置當前線程爲本身,其實線程一第一次進來加鎖的時候,到這兒就加鎖成功了!!
        setExclusiveOwnerThread(Thread.currentThread());
    else
//若是線程1第二次進來
//那麼由於state不是0了,因此會cas操做失敗
//因此會走這個方法 
        acquire(1);
}

接下來咱們分析一下,線程一第二次進來是如何加鎖的:
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
咱們慢慢分析這段代碼
首先應該分析的是:tryAcquire(arg)方法

因此咱們要想分析acquire方法,那麼先分析裏面的tryAcouire方法

//執行的是這個方法
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
這個方法調用的是以下的方法:
final boolean nonfairTryAcquire(int acquires) {
//獲取當前線程,當前線程固然是線程一嘍
    final Thread current = Thread.currentThread();
    //獲取當前的state 那麼state 是1
int c = getState();
//若是c == 0
//其實咱們知道,代碼之因此走到這兒就是由於前面c != 0
//可是jdk的源碼爲了健壯性,因此這兒仍是再次判斷了一下
//意思就是若是當前的state是0,那麼直接加鎖就能夠了。
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
//若是發現上一個加鎖也是本身
//那麼直接進行重入加鎖就能夠了
    else if (current == getExclusiveOwnerThread()) {
// 1+1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //修改狀態值爲2,可重入加鎖成功,返回true.
setState(nextc);
        return true;
    }
    return false;
}




若是上個結果返回true以後。咱們再回過頭來看這個方法:
public final void acquire(int arg) {
//(!true) 結果就是false
//那麼代碼就不繼續執行了
//也就是說若是是重入加鎖,那麼這兒加鎖成功之後就退出去了。
//換句話說,若是是重入加鎖,代碼執行到這兒重入加鎖也就成功了!!
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

如今咱們的的 線程是1,state=2
接下來咱們在分析一個狀況,線程2進來了。咱們的代碼又是如何走的。
首先state這個時候不是0了,那麼直接走的是acquire方法。
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
//直接走這個方法
        acquire(1);
}

//咱們再次分析這段代碼
//咱們分析一下這個方法。tryAcquire
//經過咱們前面的分析咱們知道,若是重入加鎖成功了,那麼這兒直接返回的是true
//可是若是發現當前的線程 和 裏面加鎖的線程不是同一個線程
//那麼重入加鎖失敗。這兒就會返回來false
//若是tryAcquire返回的是false。那麼  (!false) = true
//代碼就會運行到acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//接下來咱們分析一下acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 這個方法
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}



分析acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 這個方法
咱們首先須要分析addWariter這個方法。注意Node.EXCLUSIVE這個值是null


//進入到這個方法的時候咱們進入到了 類 AbstractQueuedSynchonzied裏面。
private Node addWaiter(Node mode) {
//根據當前線程建立了一個Node
	//這個node的nextWaiter = mode = null
    Node node = new Node(Thread.currentThread(), mode);
    //tail一開始就是等於mull  
Node pred = tail;
//因此第一次進來等式不成立
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
//第一次進來代碼走的是這兒
    enq(node);
    return node;
}



//調的這個方法,建立來的參數的是當前線程的Node
private Node enq(final Node node) {
    for (;;) {//自旋
//第一次指針有變化
//第二次進來指針仍是有變化
        Node t = tail;
//注意咱們第一次進來,知足這個條件,因此t==null,是知足條件的
//第二次進來,那麼這個時候t就不等於Null了,因此這兒的這個條件就不知足了
//去執行else語句
        if (t == null) { // Must initialize
//使用cas設置一個head頭
            if (compareAndSetHead(new Node()))
//指針要發生變化
                tail = head;
//接着代碼就會執行到這兒,你們必定要注意這兒。
//代碼執行到這兒之後,由於這是一個死循環,因此
//執行到這兒之後再次運行。
        } else {
//當前的線程的prev指向t
            node.prev = t;
//使用cas操做設置隊列的尾部
//這個cas的意思是當前的tail是否就是t,若是是t
//那麼就把值修改成當前node,那很明顯,當前的tail就是t
//因此這個就把當前node設置爲tail
            if (compareAndSetTail(t, node)) {
                t.next = node;
//返回頭結點結束這個死循環
                return t;
            }
        }
    }
}
到目前爲止指針變化以下:

 

 

接下來假設線程三要進來了。
若是線程三進來,確定就會走到這段代碼。
private Node addWaiter(Node mode) {
//建立線程三Node
    Node node = new Node(Thread.currentThread(), mode);
    //pred指向tail
Node pred = tail;
//此次pred就不等於null
    if (pred != null) {
//當前的node.pred指向 tail
        node.prev = pred;
//判斷當前pred是否是tail,若是是
//就把當前node設置爲tail,當前tail確定就是pred
        if (compareAndSetTail(pred, node)) {
//pred.netxt指向了線程三的node
            pred.next = node;
//返回當前node
            return node;
        }
    }
    enq(node);
    return node;
}

致使到此隊列的指針變化以下:

 

 

到目前爲止咱們分析清楚了addWaiter的方法,可是不要忘記了咱們的目標。咱們是分析
acquireQueued的這個方法。咱們只須要知道若是加鎖失敗了,那麼就會調用addWaiter方法,addWaiter方法返回來的是當前的node
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
//須要注意一下咱們這兒是死循環
        for (;;) {
//獲取當前節點的上一個節點,那麼咱們線程三的
//上一個節點是線程二
            final Node p = node.predecessor();
//線程二不知足這兒條件
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
//若是上面一個條件不知足的話,接下來就會調用以下的方法
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


//傳進來的參數是上一個節點和當前節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//當一個節點剛牀架你的時候 waitStatus默認值應該是0
    int ws = pred.waitStatus;
//這個條件不知足
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
//這個條件不知足
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
//那麼就會直接執行這個方法。
//這個方法就會把上一個節點的waitStatus 設置爲SINGAL
//也就是-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
//而後返回false
    return false;
}

 

咱們再回過頭來分析上一段代碼:
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
//剛剛咱們知道這兒的返回值是false
//那麼若是是false的話,if條件就不知足了。
//不知足了之後再次執行for循環。
//繼續執行shouldParkAfterFailedAcquire方法
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
當前的隊列指針狀況以下:

 

 

//再次執行這個方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取狀態,如今ws=-1
    int ws = pred.waitStatus;
//符合這個提交
    if (ws == Node.SIGNAL)
//結果返回true
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
//這個shouldParkAfterFailedAcquire方法結果爲true了之後
//那麼接下來執行parkAndCheckInterrupt方法
//因此接下來咱們分析一下parkAndCheckInterrupt方法。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private final boolean parkAndCheckInterrupt() {
//這兒操做就比較簡單了,這兒就直接把線程掛起,線程就停在這兒不動了
//必需要等另一個線程去執行unpark操做代碼才能往下執行。
    LockSupport.park(this);
    return Thread.interrupted();
}

由於代碼執行到這兒就已經卡住了。因此咱們回到源頭看到如下,最終會讓哪段代碼卡住。

//這個地方是調動acquireQueued方法致使代碼卡住,因此這兒的代碼也會卡住不動。
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

到目前爲止咱們分析了一個加鎖失敗的線程進去到隊列之後的狀況。.net

咱們如今能夠解釋一下公平鎖和非公平鎖的區別了。
咱們以前的全部的代碼分析的都是非公平鎖的,非公平鎖最大的特色就是在這兒。線程

final void lock() {
//加鎖的時候不分青紅皁白,也無論隊列裏面是否有人在排着隊
//上來就是直接加鎖,因此咱們想一下,假設咱們雖然如今隊列裏面有線程在排隊加鎖
//可是恰好當前的獨佔鎖釋放鎖了,新進來的這個線程就加鎖成功了。也就是插隊成功了。
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

若是是公平鎖的話,加鎖的時候走的是這個邏輯:
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    final void lock() {
//調用這個方法
        acquire(1);
    }

public final void acquire(int arg) {
//首先執行的是這兒的tryAcquire方法
//其實到這兒的代碼跟咱們以前看到的代碼是同樣的。
//可是在往下實現就跟非公平鎖那兒不同了
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

 

protected final boolean tryAcquire(int acquires) {
//獲取當前線程
        final Thread current = Thread.currentThread();
        int c = getState();
//若是真的當前的state=0
        if (c == 0) {
//須要這兒進行判斷,咱們先單獨把hasQueuedPredecessors
//代碼拿出來分析一下,分析後獲得,這個方法是判斷隊列裏面
//是否還有節點了。若是還有節點,那麼這個方法就返回true
//!true 就是false,代碼就不走這兒了。
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
//這兒返回false
        return false;
    }
}


public final boolean hasQueuedPredecessors() {
 
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
//若是隊列裏面還有節點
//若是還有節點那麼就返回來true
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

 

這樣的話,咱們再回過頭來看這段代碼:
public final void acquire(int arg) {
// tryAcquire方法返回來的是false,那麼!flase的結果就是等於true
    if (!tryAcquire(arg) &&
//而後接下來就是走這個方法,那麼這個方法,就是跟咱們分析的同樣了。
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
因此綜上所述,咱們的公平隊列就是每次在加鎖的時候,先判斷隊列裏面是否有線程,若是有就加到隊列後面,若是沒有,那麼就直接加鎖成功。


接下來咱們再分析一個場景,就是重入鎖釋放鎖的邏輯。
public void unlock() {
    sync.release(1);
}

接下來調用的是這段代碼:
public final boolean release(int arg) {
//重點調用的是這個方法
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

 

protected final boolean tryRelease(int releases) {
//加鎖如今是咱們的線程一進行重入鎖的釋放,一開始state的值2
//如今傳進來的參數releases 是1
//那麼c的值是1
    int c = getState() - releases;
//若是釋放鎖的線程不是當前獨佔鎖的線程,那麼就會報錯。
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
//若是把整個重入鎖都釋放完了,那麼其實c==0
//可是第一次釋放重入鎖的時候,這兒c是1
    if (c == 0) {
        free = true;
//若是整個重入鎖都釋放了,那麼就放當前的獨佔鎖置爲null
        setExclusiveOwnerThread(null);
    }
//更改線程的重入鎖的個數
    setState(c);
//若是整個鎖都釋放完了,那麼返回的是true
//若是隻是釋放了一部分,那麼返回的是false。
    return free;
}

 

 

public final boolean release(int arg) {
//若是線程一有兩個鎖重入,當前只是減小了一個
//鎖重入,那麼tryRelease返回值是false。那麼這個條件就不知足
  if (tryRelease(arg)) {
      Node h = head;
      if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
//對兩個鎖重入,由於第一次進來,那麼直接返回的是false。
    return false;
}

其實若是是隻釋放了線程一的第一個重入鎖,那麼這個返回值也沒什麼意義,咱們看到的是,就是把state的值減一了。
接下來咱們繼續分析,若是線程一,再釋放一個重入鎖,也就是state由1變爲0了。
咱們回過頭來再分析以下代碼:
public final boolean release(int arg) {
//條件知足
    if (tryRelease(arg)) {
        Node h = head;
//我這兒的分析是h.waitStatus就是等於0
//可是若是這個等於0的話,咱們的等於就走不下去了,可見這兒的值應該不等於零。
//這樣咱們就執行裏面的uparkSuccessor方法
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
//獲取waitStatus狀態
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
//獲取到第一個節點
    Node s = node.next;
//在目前咱們虛擬的環境中,s!=null
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
      //這兒直接unpakr把線程喚醒
        LockSupport.unpark(s.thread);
}
接下來咱們分析一下unpank之後代碼如何走:
其實咱們的代碼以前卡住了,而後unpark之後會致使代碼繼續往下執行。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
           //以前咱們的代碼卡在這個地方。
           //由於如今線程被喚醒了,因此這兒就能夠繼續往下執行。
           //咱們要注意這兒是一個死循環。
           //因此繼續重複執行。
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
//重複執行到這兒,獲取當前節點的上一個節點
//當前節點的上一個節點固然是head了
            final Node p = node.predecessor();
//p==head條件知足,因此接着就執行tryAcquire
//這個方法就 開始加鎖了,修改當前加鎖線程的名字
//把state 改成了1
            if (p == head && tryAcquire(arg)) {
//而後把當前線程的node設置爲head
                setHead(node);
                //把當前線程的Node 置爲null進行垃圾回收
                p.next = null; // help GC
                failed = false;
               //返回狀態
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
相關文章
相關標籤/搜索