Java中的同步類 ReentrantLock是基於AbstractQueuedSynchronizer(簡稱爲 AQS)實現的。今天從源碼來了解下ReentrantLock中非公平鎖的加鎖和釋放鎖(ReentrantLock中支持公平鎖和非公平鎖,默認是非公平鎖的,但能夠經過建立ReentrantLock對象時傳入參數指定使用公平鎖)。html
在瞭解ReentrantLock前,須要對AQS有必定的瞭解,不然在學習時會比較困難的,而且在經過源碼學習ReentrantLock時也會穿插着講解AQS內容。java
AQS中提供了一個int類型的state變量,而且state變量被volatile修飾,表示state變量的讀寫操做能夠保證原子性;而且AQS還提供了針對state變量的讀寫方法,以及使用CAS算法更新state變量的方法。 AQS使用state變量這個狀態變量來實現同步狀態。node
①、源碼展現算法
/** * The synchronization state. */ private volatile int state; /** * get 獲取state變量值 */ protected final int getState() { return state; } /** * set 更新state變量值 * @param newState 新的狀態變量值 */ protected final void setState(int newState) { state = newState; } /** * 使用CAS算法更新state變量值; 當從共享內存中讀取出的state變量值與expect指望值一致的話, * 就將其更新爲update值。使用CAS算法保證其操做的原子性 * * @param expect 指望值 * @param update 更新值 */ protected final boolean compareAndSetState(int expect, int update) { // 使用Unsafe類的本地方法來實現CAS return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
多個線程同時競爭AQS的state同步狀態,在同一時刻只能有一個線程獲取到同步狀態(獲取到鎖),那其它沒獲取到鎖的線程該怎麼辦呢app
它們會進去到一個同步隊列中,在隊列中等待同步鎖的釋放;這個同步隊列是一個基於鏈表的雙向隊列, 基於鏈表的話,就會存在Node節點,那麼AQS中節點是怎麼實現的呢源碼分析
①、Node節點:學習
AQS中本身實現了一個內部Node節點類,Node節點類中定義了一些屬性,下面來簡單說說屬性的意思:ui
static final class Node { // 標誌在同步隊列中Node節點的模式,共享模式 static final Node SHARED = new Node(); // 標誌在同步隊列中Node節點的模式,獨佔(排他)模式 static final Node EXCLUSIVE = null; // waitStatus值爲1時表示該線程節點已釋放(超時等),已取消的節點不會再阻塞。 static final int CANCELLED = 1; // waitStatus值爲-1時表示當此節點的前驅結點釋放鎖時,而後當前節點中的線程就能夠去獲取鎖運行 static final int SIGNAL = -1; /** * waitStatus爲-2時,表示該線程在condition隊列中阻塞(Condition有使用), * 當其餘線程調用了Condition的signal()方法後,CONDITION狀態的結點將從 * 等待隊列轉移到同步隊列中,等待獲取同步鎖。 */ static final int CONDITION = -2; /** * waitStatus爲-3時,與共享模式有關,在共享模式下,該狀態表示可運行 * (CountDownLatch中有使用)。 */ static final int PROPAGATE = -3; /** * waitStatus:等待狀態,指的是當前Node節點中存放的線程的等待狀態, * 等待狀態值就是上面的四個狀態值:CANCELLED、SIGNAL、CONDITION、PROPAGATE */ volatile int waitStatus; /** * 由於同步隊列是雙向隊列,那麼每一個節點都會有指向前一個節點的 prev 指針 */ volatile Node prev; /** * 由於同步隊列是雙向隊列,那麼每一個節點也都會有指向後一個節點的 next 指針 */ volatile Node next; /** * Node節點中存放的阻塞的線程引用 */ volatile Thread thread; /** * 當前節點與其next後繼結點的所屬模式,是SHARED共享模式,仍是EXCLUSIVE獨佔模式, * * 注:好比說當前節點A是共享的,那麼它的這個字段是shared,也就是說在這個等待隊列中, * A節點的後繼節點也是shared。 */ Node nextWaiter; /** * 獲取當前節點是否爲共享模式 */ final boolean isShared() { return nextWaiter == SHARED; } /** * 獲取當前節點的 prev前驅結點 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } // 在後面的addWaiter方法會使用到,線程競爭state同步鎖失敗時,會建立Node節點存放thread Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
②、同步隊列結構圖(雙向隊列):this
經過前面兩點,能夠了解到AQS的原理究竟是什麼了,總結爲一句話:AQS使用一個Volatile的int類型的成員變量來表示同步狀態,經過內置的FIFO隊列來完成資源獲取的排隊工做,經過CAS完成對State值的修改。spa
而後再來一張圖,使得理解更加深入:
圖片來源:Java技術之AQS詳解
好了, AQS暫時能夠先了解到這裏了,知道這些後,在後面瞭解 ReentrantLock時就會變的容易些,而且後面經過源碼學習 ReentrantLock時,因爲會使用到AQS的模版方法,因此也會講解到AQS的內容。
在瞭解ReentrantLock以前,先將ReentrantLock與Synchronized進行比較下,這樣能夠更加了解ReentrantLock的特性,也有助於下面源碼的閱讀;
建立一個ReentrantLock對象,在建立對象時,若是不指定公平鎖的話,默認是非公平鎖;
①、簡單瞭解下什麼是公平鎖,什麼是非公平鎖?
公平鎖:按照申請同步鎖的順序來獲取鎖;非公平鎖:不會按照申請鎖的順序獲取鎖,存在鎖的搶佔;
注:後面會經過源碼瞭解下非公平鎖和公平鎖是怎樣獲取鎖的。
②、源碼以下:
// 默認是非公平的鎖 ReentrantLock lock = new ReentrantLock(); // 構造方法默認建立了一個 NonfairSync 非公平鎖對象 public ReentrantLock() { // NonfairSync繼承了Sync類,Sync類又繼承了AQS類 sync = new NonfairSync(); } // 傳入參數 true,指定爲公平鎖 ReentrantLock lock = new ReentrantLock(true); // 傳入參數的構造方法,當fair爲true時,建立一個公平鎖對象,不然建立一個非公平鎖對象 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
①、開始先經過一個簡單流程圖來看下獨佔模式下加鎖的流程:
圖片來源:美團技術團隊
②、源碼分析:加鎖時首先使用CAS算法嘗試將state狀態變量設置爲1,設置成功後,表示當前線程獲取到了鎖,而後將獨佔鎖的擁有者設置爲當前線程;若是CAS設置不成功,則進入Acquire方法進行後續處理。
final void lock() { // 使用CAS算法嘗試將state狀態變量設置爲1 if (compareAndSetState(0, 1)) // 設置成功後,表示當前線程獲取到了鎖,而後將獨佔鎖的擁有者設置爲當前線程 setExclusiveOwnerThread(Thread.currentThread()); else // 進行後續處理,會涉及到重入性、建立Node節點加入到隊列尾等 acquire(1); }
③、探究下acquire(1) 方法裏面是什麼呢 acquire(1) 方法是AQS提供的方法:
public final void acquire(int arg) { /** * 使用tryAcquire()方法,讓當前線程嘗試獲取同步鎖,獲取成的話,就不會執行後面的acquireQueued() * 方法了,這是因爲 && 邏輯運算符的特性決定的。 * * 若是使用tryAcquire()方法獲取同步鎖失敗的話,就會繼續執行acquireQueued()方法,它的做用是 * 一直死循環遍歷同步隊列,直到使addWaiter()方法建立的節點中線程獲取到鎖。 * * 若是acquireQueued()返回的true,這個true不是表明成功的獲取到鎖,而是表明當前線程是否存在 * 中斷標誌,若是存在的話,在獲取到同步鎖後,須要使用selfInterrupt()對當前線程進行中斷。 */ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
1)、tryAcquire(arg) 方法源碼解讀:NonfairSync 非公平鎖中重寫了AQS的tryAcquire()方法
final boolean nonfairTryAcquire(int acquires) { // 當前線程 final Thread current = Thread.currentThread(); // 獲取當前state同步狀態變量值,因爲使用volatile修飾,單獨的讀寫操做具備原子性 int c = getState(); // 若是狀態值爲0 if (c == 0) { // 使用compareAndSetState方法這個CAS算法嘗試將state同步狀態變量設置爲1 獲取同步鎖 if (compareAndSetState(0, acquires)) { // 而後將獨佔鎖的擁有者設置爲當前線程 setExclusiveOwnerThread(current); return true; } } // 若是擁有獨佔鎖的的線程是當前線程的話,表示當前線程須要重複獲取鎖(重入鎖) else if (current == getExclusiveOwnerThread()) { // 當前同步狀態state變量值加1 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 寫入state同步狀態變量值,因爲使用volatile修飾,單獨的讀寫操做具備原子性 setState(nextc); return true; } return false; }
2)、addWaiter( Node.EXCLUSIVE ) :建立一個同步隊列Node節點,同時綁定節點的模式爲獨佔模式,而且將建立的節點插入到同步隊列尾部;addWaiter( ) 方法是AQS提供方法。
private Node addWaiter(Node mode) { // model參數是獨佔模式,默認爲null; Node node = new Node(Thread.currentThread(), mode); // 將當前同步隊列的tail尾節點的地址引用賦值給pre變量 Node pred = tail; // 若是pre不爲null,說明同步隊列中存在節點 if (pred != null) { // 當前節點的前驅結點指向pre尾節點 node.prev = pred; // 使用CAS算法將當前節點設置爲尾節點,使用CAS保證其原子性 if (compareAndSetTail(pred, node)) { // 尾節點設置成功,將pre舊尾節點的後繼結點指向新尾節點node pred.next = node; return node; } } // 若是尾節點爲null,表示同步隊列中尚未節點,enq()方法將當前node節點插入到隊列中 enq(node); return node; }
3)、說完addWaiter( Node.EXCLUSIVE )方法,接下來講下acquireQueued()方法,它是怎樣使addWaiter()建立的節點中的線程獲取到state同步鎖的。(這個方法也是AQS提供的)
源碼走起:
final boolean acquireQueued(final Node node, int arg) { // 標誌cancelAcquire()方法是否執行 boolean failed = true; try { // 標誌是否中斷,默認爲false不中斷 boolean interrupted = false; for (;;) { // 獲取當前節點的前驅結點 final Node p = node.predecessor(); /** * 若是當前節點的前驅結點已是同步隊列的頭結點了,說明了兩點內容: * 一、其前驅結點已經獲取到了同步鎖了,而且鎖還沒釋放 * 二、其前驅結點已經獲取到了同步鎖了,可是鎖已經釋放了 * * 而後使用tryAcquire()方法去嘗試獲取同步鎖,若是前驅結點已經釋放了鎖,那麼就會獲取成功, * 不然同步鎖獲取失敗,繼續循環 */ if (p == head && tryAcquire(arg)) { // 將當前節點設置爲同步隊列的head頭結點 setHead(node); // 而後將當前節點的前驅結點的後繼結點置爲null,幫助進行垃圾回收 p.next = null; // help GC failed = false; // 返回中斷的標誌 return interrupted; } /** * shouldParkAfterFailedAcquire()是對當前節點的前驅結點的狀態進行判斷,以及去針對各類 * 狀態作出相應處理,因爲文章篇幅問題,具體源碼本文不作講解;只需知道若是前驅結點p的狀態爲 * SIGNAL的話,就返回true。 * * parkAndCheckInterrupt()方法會使當前線程進去waiting狀態,而且查看當前線程是否被中斷, * interrupted() 同時會將中斷標誌清除。 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 中斷標誌置爲true interrupted = true; } } finally { if (failed) /** * 若是for(;;)循環中出現異常,而且failed=false沒有執行的話,cancelAcquire方法 * 就會將當前線程的狀態置爲 node.CANCELLED 已取消狀態,而且將當前節點node移出 * 同步隊列。 */ cancelAcquire(node); } }
4)、最後說下 selfInterrupt() 方法, 這個方法就是將當前線程進行中斷:
static void selfInterrupt() { // 中斷當前線程 Thread.currentThread().interrupt(); }
①、公平鎖 FairSync 的加鎖 lock() 加鎖方法:
final void lock() { acquire(1); }
②、非公平鎖 NonfairSync 的加鎖 lock() 加鎖方法:上面講解源碼的時候有提到喲,還有印象嗎,沒印象的話也不要緊,不要哭 , 嘿嘿,我都準備好了。 源碼奉上:
final void lock() { /** * 看到這,是否是發現了什麼,非公平鎖在此處直觀看的話,發現比公平鎖多了這幾行代碼; * 這裏就是使得線程存在了一個搶佔,若是當前同步隊列中的head頭結點中 線程A 恰好釋放了同步鎖, * 而後此時 線程B 正好來了,那麼此時線程B就會獲取到鎖,而此時同步隊列中head頭結點的後繼結點中的 * 線程C 就沒法獲取到同步鎖,只能等待線程B釋放鎖後,嘗試獲取鎖了。 */ if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
③、除了上面那處不一樣以外,還有別的地方嗎;別急,再看看 acquire(1) 方法是否同樣呢?
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
誒呀,方法點進去都是同樣的呀,可不嘛,都是調用的AQS提供的 acquire(1) 方法;可是彆着急,上面在講解非公平鎖加鎖時,有提到的 tryAcquire(arg) 方法在AQS的不一樣子孫類中都有各自的實現的。如今打開公平鎖的 tryAcquire(arg) 方法看看其源碼與非公平鎖有什麼區別:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { /** * 經過對比源碼發現,公平鎖比非公平鎖多了這塊代碼: !hasQueuedPredecessors() * hasQueuedPredecessors() 是作什麼呢?就是判斷當前同步隊列中是否存在節點,若是存在節點呢, * 就返回true,因爲前面有個 !,那麼就是false,再根據 && 邏輯運算符的特性,不會繼續執行了; * * tryAcquire()方法直接返回false,後面的邏輯就和非公平鎖的一致了,就是建立Node節點,並將 * 節點加入到同步隊列尾; 公平鎖:發現當前同步隊列中存在節點,有線程在本身前面已經申請可鎖,那 * 本身就得乖乖的向後面排隊去。 * * 友情提示:在生活中,咱們也須要按照先來後到去排隊,保證素質; 還有就是怕大家不排隊被別人打了。 */ if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
鬆口氣,從中午一直寫到下午快四點了,先讓我歇口氣,快累成狗了;本文還剩下釋放鎖部分沒寫呢,歇口氣,喝口水繼續 。注意:ReentrantLock在釋放鎖的時候,並不區分公平鎖和非公平鎖。
①、unlock() 釋放鎖的方法:
public void unlock() { // 釋放鎖時,須要將state同步狀態變量值進行減 1,傳入參數 1 sync.release(1); }
②、release( int arg ) 方法解析:(此方法是AQS提供的)
public final boolean release(int arg) { // tryRelease方法:嘗試釋放鎖,成功true,失敗false if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 頭結點不爲空而且頭結點的waitStatus不是初始化節點狀況,而後喚醒此阻塞的線程 unparkSuccessor(h); return true; } return false; }
注意:這裏的判斷條件爲何是 h != null && h.waitStatus != 0 ?
h == null Head還沒初始化。初始狀況下,head == null,第一個節點入隊,Head會被初始化一個虛擬節點。因此說,這裏若是還沒來得及入隊,就會出現head == null 的狀況。h != null && waitStatus == 0 代表後繼節點對應的線程仍在運行中,不須要喚醒。
h != null && waitStatus < 0 代表後繼節點可能被阻塞了,須要喚醒。
③、而後再來看看tryRelease(arg) 方法:
protected final boolean tryRelease(int releases) { // 當前state狀態值進行減一 int c = getState() - releases; // 若是當前獨佔鎖的擁有者不是當前線程,則拋出 非法監視器狀態 異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 更新state同步狀態值 setState(c); return free; }
④、最後看看unparkSuccessor(Node node) 方法:
private void unparkSuccessor(Node node) { // 獲取頭結點waitStatus int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 獲取當前節點的下一個節點 Node s = node.next; // 若是下個節點是null或者下個節點被cancelled,就找到隊列最開始的非cancelled狀態的節點 if (s == null || s.waitStatus > 0) { s = null; // 就從尾部節點開始找,到隊首,找到隊列第一個waitStatus<0的節點。 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 若是當前節點的後繼結點不爲null,則將其節點中處於阻塞狀態的線程unpark喚醒 if (s != null) LockSupport.unpark(s.thread); }
注意:爲何要從後往前找第一個非Cancelled的節點呢?緣由以下:
因爲以前加鎖時的addWaiter( )方法的緣由;
private Node addWaiter(Node mode) { // model參數是獨佔模式,默認爲null; Node node = new Node(Thread.currentThread(), mode); // 將當前同步隊列的tail尾節點的地址引用賦值給pre變量 Node pred = tail; // 若是pre不爲null,說明同步隊列中存在節點 if (pred != null) { // 當前節點的前驅結點指向pre尾節點 node.prev = pred; // 使用CAS算法將當前節點設置爲尾節點,使用CAS保證其原子性 if (compareAndSetTail(pred, node)) { // 尾節點設置成功,將pre舊尾節點的後繼結點指向新尾節點node pred.next = node; return node; } } // 若是尾節點爲null,表示同步隊列中尚未節點,enq()方法將當前node節點插入到隊列中 enq(node); return node; }
從這裏能夠看到,節點入隊並非原子操做,也就是說,node.prev = pred ; compareAndSetTail( pred, node ) 這兩個地方能夠看做Tail入隊的原子操做,可是此時 pred.next = node; 還沒執行,若是這個時候執行了unparkSuccessor方法,就沒辦法從前日後找了,因此須要從後往前找。還有一點緣由,在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針並未斷開,所以也是必需要從後往前遍歷纔可以遍歷徹底部的Node。
end! 長吸一口氣,終於本文算是寫完了,最後再看看有沒有錯別字,以及排排版。後續還會出一篇結合CountDownLatch源碼學習共享鎖(共享模式)的文章。