ReentrantLock能夠有公平鎖和非公平鎖的不一樣實現,只要在構造它的時候傳入不一樣的布爾值,繼續跟進下源碼咱們就能發現,關鍵在於實例化內部變量sync
的方式不一樣,以下所示java
/** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
公平鎖內部是FairSync,非公平鎖內部是NonfairSync。而無論是FairSync仍是NonfariSync,都間接繼承自AbstractQueuedSynchronizer這個抽象類,以下圖所示node
該抽象類爲咱們的加鎖和解鎖過程提供了統一的模板方法,只是一些細節的處理由該抽象類的實現類本身決定。因此在解讀ReentrantLock(重入鎖)的源碼以前,有必要了解下AbstractQueuedSynchronizer。算法
AbstractQueuedSynchronizer,簡稱AQS,爲構建不一樣的同步組件(重入鎖、讀寫鎖、CountDownLatch、Semphore等)提供了可擴展的基礎框架,以下圖所示。編程
AQS以模板方法模式在內部定義了獲取和釋放同步狀態的模板方法,並留下鉤子函數供子類繼承時進行擴展,由子類決定在獲取和釋放同步狀態時的細節,從而實現知足自身功能特性的需求。除此以外,AQS經過內部的同步隊列管理獲取同步狀態失敗的線程,向實現者屏蔽了線程阻塞和喚醒的細節。安全
CAS算法是AbstractQueuedSynchronizer的核心。數據結構
AbstractQueuedSynchronizer類底層的數據結構是使用雙向鏈表,是隊列的一種實現,故也可當作是隊列,其中Sync queue,即同步隊列,是雙向鏈表,包括head結點和tail結點,head結點主要用做後續的調度。而Condition queue不是必須的,其是一個單向鏈表,只有當使用Condition時,纔會存在此單向鏈表。而且可能會有多個Condition queue。併發
AQS中同步等待隊列的實現是一個帶頭尾指針(這裏用指針表示引用是爲了後面講解源碼時能夠更直觀形象,何況引用自己是一種受限的指針)且不帶哨兵結點(後文中的頭結點表示隊列首元素結點,不是指哨兵結點)的雙向鏈表。app
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head;//指向隊列首元素的頭指針 /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail;//指向隊列尾元素的尾指針
head是頭指針,指向隊列的首元素;tail是尾指針,指向隊列的尾元素。而隊列的元素結點Node定義在AQS內部,主要有以下幾個成員變量框架
volatile Node prev; //指向前一個結點的指針 volatile Node next; //指向後一個結點的指針 volatile Thread thread; //當前結點表明的線程 volatile int waitStatus; //等待狀態
同步隊列的結構以下圖所示函數
爲了接下來可以更好的理解加鎖和解鎖過程的源碼,對該同步隊列的特性進行簡單的講解:
爲了加深理解,還能夠在閱讀源碼的過程當中思考下這個問題:
這個同步隊列是FIFO隊列,也就是說先在隊列中等待的線程將比後面的線程更早的獲得鎖,那ReentrantLock是如何基於這個FIFO隊列實現非公平鎖的?
/** * The synchronization state. */ private volatile int state;
這是一個帶volatile前綴的int值,是一個相似計數器的東西。在不一樣的同步組件中有不一樣的含義。以ReentrantLock爲例,state能夠用來表示該鎖被線程重入的次數。當state爲0表示該鎖不被任何線程持有;當state爲1表示線程剛好持有該鎖1次(未重入);當state大於1則表示鎖被線程重入state次。由於這是一個會被併發訪問的量,爲了防止出現可見性問題要用volatile進行修飾。
/** * The current owner of exclusive mode synchronization. */ private transient Thread exclusiveOwnerThread;
如註釋所言,這是在獨佔同步模式下標記持有同步狀態線程的。ReentrantLock就是典型的獨佔同步模式,該變量用來標識鎖被哪一個線程持有。
瞭解AQS的主要結構後,就能夠開始進行ReentrantLock的源碼解讀了。因爲非公平鎖在實際開發中用的比較多,故以講解非公平鎖的源碼爲主。如下面這段對非公平鎖使用的代碼爲例:
public class NoFairLockTest { public static void main(String[] args) { //建立非公平鎖 ReentrantLock lock = new ReentrantLock(false); try { //加鎖 lock.lock(); //模擬業務處理用時 TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { //釋放鎖 lock.unlock(); } } }
加鎖流程從lock.lock()
開始
public void lock() { sync.lock(); }
進入該源碼,正確找到sycn的實現類後能夠看到真正有內容的入口方法
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ //加鎖流程真正意義上的入口 final void lock() { //以cas方式嘗試將AQS中的state從0更新爲1 if (compareAndSetState(0, 1)) //獲取鎖成功則將當前線程標記爲持有鎖的線程,而後直接返回 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1);//獲取鎖失敗則執行該方法 }
首先嚐試快速獲取鎖,以cas的方式將state的值更新爲1,只有當state的原值爲0時更新才能成功,由於state在ReentrantLock的語境下等同於鎖被線程重入的次數,這意味着只有當前鎖未被任何線程持有時該動做纔會返回成功。若獲取鎖成功,則將當前線程標記爲持有鎖的線程,而後整個加鎖流程就結束了。若獲取鎖失敗,則執行acquire方法
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
該方法主要的邏輯都在if判斷條件中,這裏面有3個重要的方法tryAcquire(),addWaiter()和acquireQueued(),這三個方法中分別封裝了加鎖流程中的主要處理邏輯,理解了這三個方法到底作了哪些事情,整個加鎖流程就清晰了。
tryAcquire是AQS中定義的鉤子方法,以下所示
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
該方法默認會拋出異常,強制同步組件經過擴展AQS來實現同步功能的時候必須重寫該方法,ReentrantLock在公平和非公平模式下對此有不一樣實現,非公平模式的實現以下:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
底層調用了nonfairTryAcquire()
從方法名上咱們就能夠知道這是非公平模式下嘗試獲取鎖的方法,具體方法實現以下
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread();//獲取當前線程實例 int c = getState();//獲取state變量的值,即當前鎖被重入的次數 if (c == 0) { //state爲0,說明當前鎖未被任何線程持有 if (compareAndSetState(0, acquires)) { //以cas方式獲取鎖 setExclusiveOwnerThread(current); //將當前線程標記爲持有鎖的線程 return true;//獲取鎖成功,非重入 } } else if (current == getExclusiveOwnerThread()) { //當前線程就是持有鎖的線程,說明該鎖被重入了 int nextc = c + acquires;//計算state變量要更新的值 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc);//非同步方式更新state值 return true; //獲取鎖成功,重入 } return false; //走到這裏說明嘗試獲取鎖失敗 }
這是非公平模式下獲取鎖的通用方法。它囊括了當前線程在嘗試獲取鎖時的全部可能狀況:
由於咱們用state來統計鎖被線程重入的次數,因此當前線程嘗試獲取鎖的操做是否成功能夠簡化爲:state值是否成功累加1,是則嘗試獲取鎖成功,不然嘗試獲取鎖失敗。
其實這裏還能夠思考一個問題:
nonfairTryAcquire已經實現了一個囊括全部可能狀況的嘗試獲取鎖的方式,爲什麼在剛進入lock方法時還要經過compareAndSetState(0, 1)去獲取鎖,畢竟後者只有在鎖未被任何線程持有時才能執行成功,咱們徹底能夠把compareAndSetState(0, 1)去掉,對最後的結果不會有任何影響。
這種在進行通用邏輯處理以前針對某些特殊狀況提早進行處理的方式在後面還會看到,一個直觀的想法就是它能提高性能,而代價是犧牲必定的代碼簡潔性。
退回到上層的acquire方法,
public final void acquire(int arg) { if (!tryAcquire(arg) && //當前線程嘗試獲取鎖,若獲取成功返回true,不然false //只有當前線程獲取鎖失敗纔會執行者這部分代碼 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(arg) 返回成功,則說明當前線程成功獲取了鎖(第一次獲取或者重入),由取反和&&可知,整個流程到這結束,只有當前線程獲取鎖失敗纔會執行後面的判斷。先來看 addWaiter(Node.EXCLUSIVE) 部分,這部分代碼描述了當線程獲取鎖失敗時如何安全的加入同步等待隊列。這部分代碼能夠說是整個加鎖流程源碼的精華,充分體現了併發編程的藝術性。
addWaiter函數完成的功能是將調用此方法的線程封裝成爲一個結點並放入Sync queue的尾部。
private Node addWaiter(Node mode) { //首先建立一個新節點,並將當前線程實例封裝在內部,mode這裏爲null. // mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享),默認是獨佔模式 Node node = new Node(Thread.currentThread(), mode); // 嘗試快速方式將當前node結點直接放到隊尾 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 尾結點爲空(即尚未被初始化過),或者是compareAndSetTail操做失敗,則入隊列 enq(node); return node; }
首先建立了一個新節點,並將當前線程實例封裝在其內部,以後咱們直接看enq(node)方法就能夠了,中間這部分邏輯在enq(node)中都有,之因此加上這部分「重複代碼」和嘗試獲取鎖時的「重複代碼」同樣,對某些特殊狀況
進行提早處理,犧牲必定的代碼可讀性換取性能提高。
private Node enq(final Node node) { for (;;) { Node t = tail; //t指向當前隊列的最後一個節點,隊列爲空則爲null if (t == null) { // 隊列爲空,建立一個空的標誌結點做爲head結點,並將tail也指向它 if (compareAndSetHead(new Node())) //構造新結點,CAS方式設置爲隊列首元素,當head==null時更新成功 tail = head;//尾指針指向首結點 } else { //隊列不爲空 node.prev = t; if (compareAndSetTail(t, node)) { //CAS將尾指針指向當前結點,當t(原來的尾指針)==tail(當前真實的尾指針)時執行成功 t.next = node; //原尾結點的next指針指向當前結點 return t; } } } }
這裏有兩個CAS操做:
/** * CAS head field. Used only by enq. */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); }
/** * CAS tail field. Used only by enq. */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
外層的for循環保證了全部獲取鎖失敗的線程通過失敗重試後最後都能加入同步隊列。
由於AQS的同步隊列是不帶哨兵結點的,故當隊列爲空時要進行特殊處理,這部分在if分句中。注意當前線程所在的結點不能直接插入空隊列,由於阻塞的線程是由前驅結點進行喚醒的。故先要插入一個結點做爲隊列首元素,當鎖釋放時由它來喚醒後面被阻塞的線程,從邏輯上這個隊列首元素也能夠表示當前正獲取鎖的線程,雖然並不必定真實持有其線程實例。
首先經過new Node()建立一個空結點,而後以CAS方式讓頭指針指向該結點(該結點並不是當前線程所在的結點),若該操做成功,則將尾指針也指向該結點。這部分的操做流程能夠用下圖表示
當隊列不爲空,則執行通用的入隊邏輯,這部分在else分句中
else { node.prev = t;//step1:待插入結點pre指針指向原尾結點 if (compareAndSetTail(t, node)) { step2:CAS方式更改尾指針 t.next = node; //原尾結點next指針指向新的結點 return t; } }
首先當前線程所在的結點的前向指針pre指向當前線程認爲的尾結點,源碼中用t表示。而後以CAS的方式將尾指針指向當前結點,該操做僅當tail=t,即尾指針在進行CAS前未改變時成功。若CAS執行成功,則將原尾結點的後向指針next指向新的尾結點。整個過程以下圖所示
整個入隊的過程並不複雜,是典型的CAS加失敗重試的樂觀鎖策略。其中只有更新頭指針和更新尾指針這兩步進行了CAS同步,能夠預見高併發場景下性能是很是好的。可是本着質疑精神咱們不由會思考下這麼作真的線程安全嗎?
爲了確保真的理解了它,能夠思考這個問題:把enq方法圖中的4放到5以後,整個入隊的過程還線程安全嗎?
到這爲止,獲取鎖失敗的線程加入同步隊列的邏輯就結束了。可是線程加入同步隊列後會作什麼咱們並不清楚,這部分在acquireQueued方法中
此時的狀態是:
該線程獲取資源失敗,已經被放入等待隊列尾部了。
下面就是:
線程在等待隊列中獲取資源,一直獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 死循環,正常狀況下線程只有得到鎖才能跳出循環 for (;;) { final Node p = node.predecessor();// 得到當前線程所在結點的前驅結點 // 第一個if分句 if (p == head && tryAcquire(arg)) { setHead(node); // 將當前結點設置爲隊列頭結點 p.next = null; // help GC failed = false; return interrupted;// 正常狀況下死循環惟一的出口 } // 第二個if分句 if (shouldParkAfterFailedAcquire(p, node) && // 判斷是否要阻塞當前線程 parkAndCheckInterrupt()) // 阻塞當前線程 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
這段代碼主要的內容都在for循環中,這是一個死循環,主要有兩個if分句構成。
第一個if分句中,當前線程首先會判斷前驅結點是不是頭結點,若是是則嘗試獲取鎖,獲取鎖成功則會設置當前結點爲頭結點(更新頭指針)。爲何必須前驅結點爲頭結點才嘗試去獲取鎖?由於頭結點表示當前正佔有鎖的線程,正常狀況下該線程釋放鎖後會通知後面結點中阻塞的線程,阻塞線程被喚醒後去獲取鎖,這是咱們但願看到的。然而還有一種狀況,就是前驅結點取消了等待,此時當前線程也會被喚醒,這時候就不該該去獲取鎖,而是往前回溯一直找到一個沒有取消等待的結點,而後將自身鏈接在它後面。一旦咱們成功獲取了鎖併成功將自身設置爲頭結點,就會跳出for循環。不然就會執行第二個if分句:確保前驅結點的狀態爲SIGNAL,而後阻塞當前線程。
先來看shouldParkAfterFailedAcquire(p, node),從方法名上咱們能夠大概猜出這是判斷是否要阻塞當前線程的,方法內容以下
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //狀態爲SIGNAL /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { //狀態爲CANCELLED, /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //狀態爲初始化狀態(ReentrentLock語境下) /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
能夠看到針對前驅結點pred的狀態會進行不一樣的處理
其實這個方法的含義很簡單,就是確保當前結點的前驅結點的狀態爲SIGNAL,SIGNAL意味着線程釋放鎖後會喚醒後面阻塞的線程。畢竟,只有確保可以被喚醒,當前線程才能放心的阻塞。
可是要注意只有在前驅結點已是SIGNAL狀態後纔會執行後面的方法當即阻塞,對應上面的第一種狀況。其餘兩種狀況則由於返回false而從新執行一遍
for循環。這種延遲阻塞其實也是一種高併發場景下的優化,試想我若是在從新執行循環的時候成功獲取了鎖,是否是線程阻塞喚醒的開銷就省了呢?
最後咱們來看看阻塞線程的方法parkAndCheckInterrupt
shouldParkAfterFailedAcquire返回true表示應該阻塞當前線程,則會執行parkAndCheckInterrupt方法,這個方法比較簡單,底層調用了LockSupport來阻塞當前線程,源碼以下:
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
該方法內部經過調用LockSupport的park方法來阻塞當前線程,不清楚LockSupport的能夠看看這裏。LockSupport功能簡介及原理淺析
下面經過一張流程圖來講明線程從加入同步隊列到成功獲取鎖的過程
歸納的說,線程在同步隊列中會嘗試獲取鎖,失敗則被阻塞,被喚醒後會不停的重複這個過程,直到線程真正持有了鎖,並將自身結點置於隊列頭部。
ReentrantLock非公平模式下的加鎖流程以下
解鎖的源碼相對簡單,源碼以下:
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { //釋放鎖(state-1),若釋放後鎖可被其餘線程獲取(state=0),返回true Node h = head; //當前隊列不爲空且頭結點狀態不爲初始化狀態(0) if (h != null && h.waitStatus != 0) unparkSuccessor(h); //喚醒同步隊列中被阻塞的線程 return true; } return false; }
正確找到sync的實現類,找到真正的入口方法,主要內容都在一個if語句中,先看下判斷條件tryRelease方法
protected final boolean tryRelease(int releases) { int c = getState() - releases; //計算待更新的state值 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //待更新的state值爲0,說明持有鎖的線程未重入,一旦釋放鎖其餘線程將能獲取 free = true; setExclusiveOwnerThread(null);//清除鎖的持有線程標記 } setState(c);//更新state值 return free; }
tryRelease其實只是將線程持有鎖的次數減1,即將state值減1,若減小後線程將徹底釋放鎖(state值爲0),則該方法將返回true,不然返回false。因爲執行該方法的線程必然持有鎖,故該方法不須要任何同步操做。
若當前線程已經徹底釋放鎖,即鎖可被其餘線程使用,則還應該喚醒後續等待線程。不過在此以前須要進行兩個條件的判斷:
接下來就是喚醒線程的操做,unparkSuccessor(h)源碼以下
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
通常狀況下只要喚醒後繼結點的線程就好了,可是後繼結點可能已經取消等待,因此從隊列尾部往前回溯,找到離頭結點最近的正常結點,並喚醒其線程。
首先是 ReentrantLock 的入口
public void lock() { sync.lock(); }
其公平鎖的實現以下:
final void lock() { acquire(1); }
看到這裏,就知道了,就是上面非公平鎖的實現中的方法
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
回憶一下,tryAcquire是AQS中定義的鉤子方法:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
ReentrantLock在公平和非公平模式下對此有不一樣實現,非公平模式上面已經介紹過了,公平模式的實現以下:
/** * 獲取公平鎖的方法 * * 1)獲取鎖數量c * 1.1)若是c==0,若是當前線程是等待隊列中的頭節點,使用CAS將state(鎖數量)從0設置爲1,若是設置成功,當前線程獨佔鎖-->請求成功 * 1.2)若是c!=0,判斷當前的線程是否是就是當下獨佔鎖的線程,若是是,就將當前的鎖數量狀態值+1(這也就是可重入鎖的名稱的來源)-->請求成功 * 最後,請求失敗後,將當前線程鏈入隊尾並掛起,以後等待被喚醒。 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 獲取鎖數量c if (c == 0) { if (!hasQueuedPredecessors() && // 若是當前線程是等待隊列中的頭節點 compareAndSetState(0, acquires)) { // 使用CAS將state(鎖數量)從0設置爲1 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 判斷當前的線程是否是就是當下獨佔鎖的線程 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
下邊的代碼與非公平鎖同樣。
公平鎖模式下,對鎖的獲取有嚴格的條件限制。在同步隊列有線程等待的狀況下,全部線程在獲取鎖前必須先加入同步隊列。隊列中的線程按加入隊列的前後次序得到鎖。
從公平鎖加鎖的入口開始,
對比非公平鎖,少了非重入式獲取鎖的方法(即少了CAS嘗試將state從0設爲1,進而得到鎖的過程),這是第一個不一樣點。
接着看獲取鎖的通用方法tryAcquire(),多了須要判斷當前線程是否在等待隊列首部的邏輯(實際上就是少了再次插隊的過程,可是CAS獲取仍是有的)
在真正CAS獲取鎖以前加了判斷,內容以下
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
從方法名咱們就可知道這是判斷隊列中是否有優先級更高的等待線程,隊列中哪一個線程優先級最高?因爲頭結點是當前獲取鎖的線程,隊列中的第二個結點表明的線程優先級最高。
那麼咱們只要判斷隊列中第二個結點是否存在以及這個結點是否表明當前線程就好了。這裏分了兩種狀況進行探討:
因此(s = h.next) == null 就是用來判斷2剛執行成功但還未執行3這種狀況的。這種狀況第二個結點必然屬於其餘線程。
以上兩種狀況都會使該方法返回true,即當前有優先級更高的線程在隊列中等待,那麼當前線程將不會執行CAS操做去獲取鎖,保證了線程獲取鎖的順序與加入同步隊列的順序一致,很好的保證了公平性,但也增長了獲取鎖的成本。
由FIFO隊列的特性知,先加入同步隊列等待的線程會比後加入的線程更靠近隊列的頭部,那麼它將比後者更早的被喚醒,它也就能更早的獲得鎖。從這個意義上,對於在同步隊列中等待的線程而言,它們得到鎖的順序和加入同步隊列的順序一致,這顯然是一種公平模式。然而,線程並不是只有在加入隊列後纔有機會得到鎖,哪怕同步隊列中已有線程在等待,非公平鎖的不公平之處就在於此。回看下非公平鎖的加鎖流程,線程在進入同步隊列等待以前有兩次搶佔鎖的機會:
只有這兩次獲取鎖都失敗後,線程纔會構造結點並加入同步隊列等待。而線程釋放鎖時是先釋放鎖(修改state值),而後才喚醒後繼結點的線程的。試想下這種狀況,線程A已經釋放鎖,但還沒來得及喚醒後繼線程C,而這時另外一個線程B恰好嘗試獲取鎖,此時鎖剛好不被任何線程持有,它將成功獲取鎖而不用加入隊列等待。線程C被喚醒嘗試獲取鎖,而此時鎖已經被線程B搶佔,故而其獲取失敗並繼續在隊列中等待。整個過程以下圖所示
若是以線程第一次嘗試獲取鎖到最後成功獲取鎖的次序來看,非公平鎖確實很不公平。由於在隊列中等待好久的線程相比還未進入隊列等待的線程並無優先權,甚至競爭也處於劣勢:在隊列中的線程要等待其餘線程喚醒,在獲取鎖以前還要檢查前驅結點是否爲頭結點。在鎖競爭激烈的狀況下,在隊列中等待的線程可能遲遲競爭不到鎖。這也就非公平在高併發狀況下會出現的飢餓問題。那咱們再開發中爲何大多使用會致使飢餓的非公平鎖?很簡單,由於它性能好啊。
非公平鎖對鎖的競爭是搶佔式的(隊列中線程除外),線程在進入等待隊列前能夠進行兩次嘗試,這大大增長了獲取鎖的機會。這種好處體如今兩個方面:
另外一處就是入隊的操做,將同步隊列非空的狀況提早處理
這兩部分的代碼在以後的通用邏輯處理中都有,很顯然屬於重複代碼,但由於避免了執行無心義的流程代碼,好比for循環,獲取同步狀態等,高併發場景下也能減小CAS競爭失敗的可能。