JDK1.5 以後發佈了JUC(java.util.concurrent),用於解決多線程併發問題。AQS 是一個特別重要的同步框架,不少同步類都藉助於 AQS 實現了對線程同步狀態的管理。java
AQS 中最主要的就是獨佔鎖和共享鎖的獲取和釋放,以及提供了一些可中斷的獲取鎖,超時等待鎖等方法。node
ReentranLock 是基於 AQS 獨佔鎖的一個實現。ReentrantReadWriteLock 是基於 AQS 共享鎖的一個讀寫鎖實現。原本打算一篇文章裏面寫完獨佔鎖和共享鎖,可是發現篇幅太長了,也不易於消化。安全
所以,本篇就先結合 ReentrantLock 源碼,分析 AQS 的獨佔鎖獲取和釋放。以及 ReentrantLock 的公平鎖和非公平鎖實現。數據結構
下一篇再寫 ReentrantReadWriteLock 讀寫鎖源碼,以及 AQS 共享鎖的獲取和釋放。多線程
在正式講解源碼以前,牆裂建議讀者作一些準備工做,最好對如下知識有必定的瞭解,這樣閱讀起來源碼會比較輕鬆(由於,我當初剛開始接觸多線程時,直接看 AQS 簡直是一臉懵逼,就像讀天書同樣。。)。併發
AQS 內部維護了一個 FIFO(先進先出)的雙向隊列。它的內部是用雙向鏈表來實現的,每一個數據節點(Node)中都包含了當前節點的線程信息,還有它的先後兩個指針,分別指向前驅節點和後繼節點。下邊看一下 Node 的屬性和方法:框架
static final class Node { //能夠認爲是一種標記,代表了這個 node 是以共享模式在同步隊列中等待 static final Node SHARED = new Node(); //也是一種標記,代表這個 node 是以獨佔模式在同步隊列中等待 static final Node EXCLUSIVE = null; /** waitStatus 常量值 */ //說明當前節點被取消,緣由有多是超時,或者被中斷。 //節點被取消的狀態是不可逆的,也就是說此節點會一直停留在取消狀態,不會轉變。 static final int CANCELLED = 1; //說明後繼節點的線程被 park 阻塞,所以當前線程須要在釋放鎖或者被取消時,喚醒後繼節點 static final int SIGNAL = -1; //說明線程在 condition 條件隊列等待 static final int CONDITION = -2; //在共享模式中用,代表下一個共享線程應該無條件傳播 static final int PROPAGATE = -3; //當前線程的等待狀態,除了以上四種值,還有一個值 0 爲初始化狀態(條件隊列的節點除外)。 //注意這個值修改時是經過 CAS ,以保證線程安全。 volatile int waitStatus; //前驅節點 volatile Node prev; //後繼節點 volatile Node next; //當前節點中的線程,經過構造函數初始化,出隊時會置空(這個後續說,重點強調) volatile Thread thread; //有兩種狀況。1.在 condition 條件隊列中的後一個節點 //2. 一個特殊值 SHARED 用於代表當前是共享模式(由於條件隊列只存在於獨佔模式) Node nextWaiter; //是不是共享模式,理由同上 final boolean isShared() { return nextWaiter == SHARED; } //返回前驅節點,若是爲空拋出空指針 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
另外,在 AQS 類中,還會記錄同步隊列的頭結點和尾結點:函數
//同步隊列的頭結點,是懶加載的,即不會當即建立一個同步隊列, //只有當某個線程獲取不到鎖,須要排隊的時候,纔會初始化頭結點 private transient volatile Node head; //同步隊列的尾結點,一樣是懶加載。 private transient volatile Node tail;
這部分就結合 ReentrantLock 源碼分析 AQS 的獨佔鎖是怎樣得到和釋放鎖的。源碼分析
首先,咱們從 ReentrantLock 開始分析,它有兩個構造方法,一個構造,能夠傳入一個 boolean 類型的參數,代表是用公平鎖仍是非公平鎖模式。另外一個構造方法,不傳入任何參數,則默認用非公平鎖。學習
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
NonfairSync 和 FairSync 都繼承自 Sync ,它們都是 ReentranLock 的內部類。 而Sync 類又繼承自 AQS (AbstractQueuedSynchronizer)。
static final class NonfairSync extends Sync { } static final class FairSync extends Sync { } abstract static class Sync extends AbstractQueuedSynchronizer { }
知道了它們之間的繼承關係,咱們就從非公平鎖的加鎖方法做爲入口,跟蹤源碼。由於非公平鎖的流程講明白以後,公平鎖大體流程都同樣,只是多了一個條件判斷(這個,一下子後邊細講,會作對比)。
NonfairSync.lock
咱們看下公平鎖的獲取鎖的方法:
final void lock() { //經過 CAS 操做把 state 設置爲 1 if (compareAndSetState(0, 1)) //若是設值成功,說明加鎖成功,保存當前得到鎖的線程 setExclusiveOwnerThread(Thread.currentThread()); else //若是加鎖失敗,則執行 AQS 的acquire 方法 acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire
這個方法的邏輯是:
tryAcquire
這是一個模板方法,具體的實現須要看它的子類,這裏對應的就是 ReentrantLock.NonfairSync.tryAcquire 方法。咱們看一下:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { //當前線程 final Thread current = Thread.currentThread(); //獲取當前的同步狀態,若爲 0 ,表示無鎖狀態。若大於 0,表示已經有線程搶到了鎖。 int c = getState(); if (c == 0) { //而後經過 CAS 操做把 state 的值改成 1。 if (compareAndSetState(0, acquires)) { // CAS 成功以後,保存當前得到鎖的線程 setExclusiveOwnerThread(current); return true; } } // 若是 state 大於0,則判斷當前線程是不是得到鎖的線程,是的話,可重入。 else if (current == getExclusiveOwnerThread()) { //因爲 ReentrantLock 是可重入的,因此每重入一次 state 就加 1 。 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
addWaiter
若是獲取鎖失敗以後,就會調用 addWaiter 方法,把當前線程加入同步隊列。
private Node addWaiter(Node mode) { //把當前線程封裝成 Node ,而且是獨佔模式 Node node = new Node(Thread.currentThread(), mode); //嘗試快速入隊,若是失敗,則會調用 enq 入隊方法。enq 會初始化隊列。 Node pred = tail; //若是 tail 不爲空,說明當前隊列中已經有節點 if (pred != null) { //把當前 node 的 prev 指針指向 tail node.prev = pred; //經過 CAS 把 node 設置爲 tail,即添加到隊尾 if (compareAndSetTail(pred, node)) { //把舊的 tail 節點的 next 指針指向當前 node pred.next = node; return node; } } //當 tail 爲空時,把 node 添加到隊列,若是須要的話,先進行隊列初始化 enq(node); //入隊成功以後,返回當前 node return node; }
enq
經過自旋,把當前節點加入到隊列中
private Node enq(final Node node) { for (;;) { Node t = tail; //若是 tail爲空,說明隊列未初始化 if (t == null) { //建立一個空節點,經過 CAS把它設置爲頭結點 if (compareAndSetHead(new Node())) //此時只有一個 head頭節點,所以把 tail也指向它 tail = head; } else { //第二次自旋時,tail不爲空,因而把當前節點的 prev指向 tail節點 node.prev = t; //經過 CAS把 tail節點設置爲當前 node節點 if (compareAndSetTail(t, node)) { //把舊的 tail節點的 next指向當前 node t.next = node; return t; } } } }
acquireQueued
入隊成功以後,就會調用 acquireQueued 方法自旋搶鎖。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //獲取當前節點的前驅節點 final Node p = node.predecessor(); //若是前驅節點就是 head 節點,就調用 tryAcquire 方法搶鎖 if (p == head && tryAcquire(arg)) { //若是搶鎖成功,就把當前 node 設置爲頭結點 setHead(node); p.next = null; // help GC failed = false; //搶鎖成功後,會把線程中斷標誌返回出去,終止for循環 return interrupted; } //若是搶鎖失敗,就根據前驅節點的 waitStatus 狀態判斷是否須要把當前線程掛起 if (shouldParkAfterFailedAcquire(p, node) && //線程被掛起時,判斷是否被中斷過 parkAndCheckInterrupt()) //注意此處,若是被線程被中斷過,須要把中斷標誌從新設置一下 interrupted = true; } } finally { if (failed) //若是拋出異常,則取消鎖的獲取,進行出隊操做 cancelAcquire(node); } }
setHead
經過代碼,咱們能夠看到,當前的同步隊列中,只有第二個節點纔有資格搶鎖。若是搶鎖成功,則會把它設置爲頭結點。
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
須要注意的是,這個方法,會把頭結點的線程設置爲 null 。想一下,爲何?
由於,此時頭結點的線程已經搶鎖成功,須要出隊了。天然的,隊列中也就不該該存在這個線程了。
PS:由 enq 方法,還有 setHead 方法,咱們能夠發現,頭結點的線程老是爲 null。這是由於,頭結點要麼是剛初始化的空節點,要麼是搶到鎖的線程出隊了。所以,咱們也經常把頭結點叫作虛擬節點(不存儲任何線程)。
shouldParkAfterFailedAcquire
以上是搶鎖成功的狀況,那麼搶鎖失敗了呢?這時,咱們須要判斷是否應該把當前線程掛起。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //獲取當前節點的前驅節點的 waitStatus int ws = pred.waitStatus; if (ws == Node.SIGNAL) //若是 ws = -1 ,說明當前線程能夠被前驅節點正常喚醒,因而就能夠安全的 park了 return true; if (ws > 0) { //若是 ws > 0,說明前驅節點被取消,則會從當前節點依次向前查找, //直到找到第一個沒有被取消的節點,把那個節點的 next 指向當前 node //這一步,是爲了找到一個能夠把當前線程喚起的前驅節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //若是 ws 爲 0,或者 -3(共享鎖狀態),則把它設置爲 -1 //返回 false,下次自旋時,就會判斷等於 -1,返回 true了 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt
若是 shouldParkAfterFailedAcquire 返回 true,說明當前線程須要被掛起。所以,就執行此方法,同時檢查線程是否被中斷。
private final boolean parkAndCheckInterrupt() { //把當前線程掛起,則 acquireQueued 方法的自旋就會暫停,等待前驅節點 unpark LockSupport.park(this); //返回當前節點是否被中斷的標誌,注意此方法會把線程的中斷標誌清除。 //所以,返回上一層方法時,須要設置 interrupted = true 把中斷標誌從新設置,以便上層代碼能夠處理中斷 return Thread.interrupted(); }
想一下,爲何搶鎖失敗後,須要判斷是否把線程掛起?
由於,若是搶不到鎖,而且還不把線程掛起,acquireQueued 方法就會一直自旋下去,這樣你的CPU能受得了嗎。
cancelAcquire
當不停的自旋搶鎖時,若發生了異常,就會調用此方法,取消正在嘗試獲取鎖的線程。node 的位置分爲三種狀況,見下面註釋,
private void cancelAcquire(Node node) { if (node == null) return; // node 再也不指向任何線程 node.thread = null; Node pred = node.prev; //從當前節點不斷的向前查找,直到找到一個有效的前驅節點 while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; //把 node 的 ws 設置爲 -1 node.waitStatus = Node.CANCELLED; // 1.若是 node 是 tail,則把 tail 更新爲 node,並把 pred.next 指向 null if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; //2.若是 node 既不是 tail,也不是 head 的後繼節點,就把 node的前驅節點的 ws 設置爲 -1 //最後把 node 的前驅節點的 next 指向 node 的後繼節點 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //3.若是 node是 head 的後繼節點,則直接喚醒 node 的後繼節點。 //這個也很好理解,由於 node 是隊列中惟一有資格嘗試獲取鎖的節點, //它放棄了資格,固然有義務把後繼節點喚醒,以讓後繼節點嘗試搶鎖。 unparkSuccessor(node); } node.next = node; // help GC } }
unparkSuccessor
這個喚醒方法就比較簡單了,
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; //從尾結點向前依次遍歷,直到找到距離當前 node 最近的一個有效節點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //把這個有效節點的線程喚醒, //喚醒以後,當前線程就能夠繼續自旋搶鎖了,(回到 park 的地方) LockSupport.unpark(s.thread); }
下面畫一個流程圖更直觀的查看整個獲取鎖的過程。
公平鎖和非公平鎖的總體流程大體相同,只是在搶鎖以前先判斷一下是否已經有人排在前面,若是有的話,就不執行搶鎖。咱們經過源碼追蹤到 FairSync.tryAcquire 方法。會發現,多了一個 hasQueuedPredecessors 方法。
hasQueuedPredecessors
這個方法判斷邏輯稍微有點複雜,有多種狀況。
public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
總結:以上幾種狀況,只有最終返回 false 時,纔會繼續往下執行。由於 false,說明沒有線程排在當前線程前面,因而經過 CAS 嘗試把 state 值設置爲 1。若成功,則方法返回。若失敗,一樣須要去排隊。
舉個例子來對比公平鎖和非公平鎖。好比,如今到飯點了,你們都到食堂打飯。把隊列中的節點比做排隊打飯的人,每一個打飯窗口都有一個管理員,只有排隊的人從管理員手中搶到鎖,纔有資格打飯。打飯的過程就是線程執行的過程。
若是,你發現前面沒有人在排隊,那麼就能夠直接從管理員手中拿到鎖,而後打飯。對於公平鎖來講,若是你前面有人在打飯,那麼你就要排隊到他後面(圖中B),等他打完以後,把鎖還給管理員。那麼,你就能夠從管理員手中拿到鎖,而後打飯了。後面的人依次排隊。這就是FIFO先進先出的隊列模型。
對於非公平鎖來講,若是你是圖中的 B,當 A 把鎖還給管理員後,有可能有另一個 D 插隊過來直接把鎖搶走。那麼,他就能夠打飯,你只能繼續等待了。
因此,能夠看出來。公平鎖是嚴格按照排隊的順序來的,先來後到嘛,你來的早,就能夠早點獲取鎖。優勢是,這樣不會形成某個線程等待時間過長,由於你們都是中規中矩的在排隊。而缺點呢,就是會頻繁的喚起線程,增長 CPU的開銷。
非公平鎖的優勢是吞吐量大,由於有可能正好鎖可用,而後線程來了,直接搶到鎖了,不用排隊了,這樣也減小了 CPU 喚醒排隊線程的開銷。 可是,缺點也很明顯,你說我排隊排了好長時間了,終於輪到我打飯了,憑什麼其餘人剛過來就插到我前面,比我還先打到飯,也太不公平了吧,後邊一大堆排隊的人更是怨聲載道。這要是每一個人來了都插到我前面去,我豈不是要餓死了。
咱們從 ReentrantLock 的 unlock 方法看起:
public void unlock() { //調用 AQS 的 release 方法 sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; //若是頭結點不爲空,而且 ws 不爲 0,則喚起後繼節點 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
這段邏輯比較簡單,當線程釋放鎖以後,就會喚醒後繼節點。 unparkSuccessor 已講,再也不贅述。而後看下 tryRelease 方法,公平鎖和非公平鎖走的是同一個方法。
protected final boolean tryRelease(int releases) { //每釋放一次鎖,state 值就會減 1,由於以前可能有鎖的重入 int c = getState() - releases; //若是當前線程不是搶到鎖的線程,則拋出異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //只有 state 的值減到 0 的時候,纔會所有釋放鎖 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
由於,ReentrantLock 支持鎖的重入,因此每次重入 state 值都會加 1,相應的每次釋放鎖, state 的值也會減 1 。因此,這也是爲何每一個 lock 方法最後都要有一個 unlock 方法釋放鎖,它們的個數須要保證相同。
當 state 值爲 0 的時候,說明鎖徹底釋放。其餘線程才能夠有機會搶到鎖。
以上已經講解了獨佔鎖主要的獲取方法 acquire ,另外還有一些其餘相關方法,再也不贅述,由於主要邏輯都是同樣的,只有部分稍有不一樣,只要理解了 acquire ,這些都是相通的。如 acquireInterruptibly 方法,它能夠在獲取鎖的時候響應中斷。還有超時獲取鎖的方法 doAcquireNanos 能夠設定獲取鎖的超時時間,超時以後就返回失敗。
下篇預告:分析 ReentrantReadWriteLock 讀寫鎖源碼,以及 AQS 共享鎖的獲取和釋放,敬請期待。
若是本文對你有用,歡迎點贊,評論,轉發。
學習是枯燥的,也是有趣的。我是「煙雨星空」,歡迎關注,可第一時間接收文章推送。