系列文章目錄java
上一篇 咱們學習了lock接口,本篇咱們就以ReentrantLock爲例,學習一下Lock鎖的基本的實現。咱們先來看看Lock接口中的方法與ReentrantLock對其實現的對照表:node
Lock 接口 | ReentrantLock 實現 |
---|---|
lock() | sync.lock() |
lockInterruptibly() | sync.acquireInterruptibly(1) |
tryLock() | sync.nonfairTryAcquire(1) |
tryLock(long time, TimeUnit unit) | sync.tryAcquireNanos(1, unit.toNanos(timeout)) |
unlock() | sync.release(1) |
newCondition() | sync.newCondition() |
從表中能夠看出,ReentrantLock對於Lock接口的實現都是直接「轉交」給sync對象的。編程
ReentrantLock只有一個sync屬性,別看只有一個屬性,這個屬性提供了全部的實現,咱們上面介紹ReentrantLock對Lock接口的實現的時候就說到,它對全部的Lock方法的實現都調用了sync的方法,這個sync就是ReentrantLock的屬性,它繼承了AQS.segmentfault
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock(); //... }
在Sync類中,定義了一個抽象方法lock,該方法應當由繼承它的子類來實現,關於繼承它的子類,咱們在下一節分析構造函數時再看。併發
ReentrantLock共有兩個構造函數:函數
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
默認的構造函數使用了非公平鎖,另一個構造函數經過傳入一個boolean類型的fair
變量來決定使用公平鎖仍是非公平鎖。其中,FairSync和NonfairSync的定義以下:工具
static final class FairSync extends Sync { final void lock() {//省略實現} protected final boolean tryAcquire(int acquires) {//省略實現} } static final class NonfairSync extends Sync { final void lock() {//省略實現} protected final boolean tryAcquire(int acquires) {//省略實現} }
這裏爲何默認建立的是非公平鎖呢?由於非公平鎖的效率高呀,當一個線程請求非公平鎖時,若是在發出請求的同時該鎖變成可用狀態,那麼這個線程會跳過隊列中全部的等待線程而得到鎖。有的同窗會說了,這不就是插隊嗎?
沒錯,這就是插隊!這也就是爲何它被稱做非公平鎖。
之因此使用這種方式是由於:源碼分析
在恢復一個被掛起的線程與該線程真正運行之間存在着嚴重的延遲。
在公平鎖模式下,你們講究先來後到,若是當前線程A在請求鎖,即便如今鎖處於可用狀態,它也得在隊列的末尾排着,這時咱們須要喚醒排在等待隊列隊首的線程H(在AQS中實際上是次頭節點),因爲恢復一個被掛起的線程而且讓它真正運行起來須要較長時間,那麼這段時間鎖就處於空閒狀態,時間和資源就白白浪費了,非公平鎖的設計思想就是將這段白白浪費的時間利用起來——因爲線程A在請求鎖的時候自己就處於運行狀態,所以若是咱們此時把鎖給它,它就會當即執行本身的任務,所以線程A有機會在線程H徹底喚醒以前得到、使用以及釋放鎖。這樣咱們就能夠把線程H恢復運行的這段時間給利用起來了,結果就是線程A更早的獲取了鎖,線程H獲取鎖的時刻也沒有推遲。所以提升了吞吐量。性能
固然,非公平鎖僅僅是在當前線程請求鎖,而且鎖處於可用狀態時有效,當請求鎖時,鎖已經被其餘線程佔有時,就只能仍是老老實實的去排隊了。學習
不管是非公平鎖的實現NonfairSync仍是公平鎖的實現FairSync,它們都覆寫了lock方法和tryAcquire方法,這兩個方法都將用於獲取一個鎖。
關於ReentrantLock對於lock方法的公平鎖的實現邏輯,咱們在逐行分析AQS源碼(1)——獨佔鎖的獲取中已經講過了,這裏再也不贅述。若是你尚未看過那篇文章或者還不瞭解AQS,建議先去看一下那一篇文章,而後再讀下文。
接下來咱們看看非公平鎖的實現邏輯:
// NonfairSync中的lock方法 final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
對比公平鎖中的lock方法:
// FairSync中的lock方法 final void lock() { acquire(1); }
可見,相比公平鎖,非公平鎖在當前鎖沒有被佔用時,能夠直接嘗試去獲取鎖,而不用排隊,因此它在一開始就嘗試使用CAS操做去搶鎖,只有在該操做失敗後,纔會調用AQS的acquire方法。
因爲acquire方法中除了tryAcquire由子類實現外,其他都由AQS實現,咱們在前面的文章中已經介紹的很詳細了,這裏再也不贅述,咱們僅僅看一下非公平鎖的tryAcquire方法實現:
// NonfairSync中的tryAcquire方法實現 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
它調用了Sync類的nonfairTryAcquire方法:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 只有這一處和公平鎖的實現不一樣,其它的徹底同樣。 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
咱們能夠拿它和公平鎖的tryAcquire對比一下:
// FairSync中的tryAcquire方法實現 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
看見沒?這兩個方法幾乎如出一轍,惟一的區別就是非公平鎖在搶鎖時再也不須要調用hasQueuedPredecessors
方法先去判斷是否有線程排在本身前面,而是直接爭鎖,其它的徹底和公平鎖一致。
前面的lock方法是阻塞式的,搶到鎖就返回,搶不到鎖就將線程掛起,而且在搶鎖的過程當中是不響應中斷的(關於不響應中斷,見這篇文章末尾的分析),lockInterruptibly提供了一種響應中斷的方式,在ReentrantLock中,不管是公平鎖仍是非公平鎖,這個方法的實現都是同樣的:
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
他們都調用了AQS的acquireInterruptibly
方法:
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
該方法首先檢查當前線程是否已經被中斷過了,若是已經被中斷了,則當即拋出InterruptedException
(這一點是lockInterruptibly要求的,參見上一篇Lock接口的介紹)。
若是調用這個方法時,當前線程尚未被中斷過,則接下來先嚐試用普通的方法來獲取鎖(tryAcquire
)。若是獲取成功了,則萬事大吉,直接就返回了;不然,與前面的lock方法同樣,咱們須要將當前線程包裝成Node扔進等待隊列,所不一樣的是,此次,在隊列中嘗試獲取鎖時,若是發生了中斷,咱們須要對它作出響應, 並拋出異常
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; //與acquireQueued方法的不一樣之處 } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); //與acquireQueued方法的不一樣之處 } } finally { if (failed) cancelAcquire(node); } }
若是你在上面分析lock方法的時候已經理解了acquireQueued方法,那麼再看這個方法就很輕鬆了,咱們把lock方法中的acquireQueued
拿出來和上面對比一下:
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //不一樣之處 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; //不一樣之處 } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; //不一樣之處 } } finally { if (failed) cancelAcquire(node); } }
經過代碼對比能夠看出,doAcquireInterruptibly
和acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
的調用本質上講並沒有區別。只不過對於addWaiter(Node.EXCLUSIVE)
,一個是外部調用,經過參數傳進來;一個是直接在方法內部調用。因此這兩個方法的邏輯幾乎是同樣的,惟一的不一樣就是在doAcquireInterruptibly
中,當咱們檢測到中斷後,再也不是簡單的記錄中斷狀態,而是直接拋出InterruptedException
。
當拋出中斷異常後,在返回前,咱們將進入finally代碼塊進行善後工做,很明顯,此時failed是爲true的,咱們將調用cancelAcquire
方法:
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // 由當前節點向前遍歷,跳過那些已經被cancel的節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 從當前節點向前開始查找,找到第一個waitStatus>0的Node, 該節點爲pred // predNext便是pred節點的下一個節點 // 到這裏可知,pred節點是沒有被cancel的節點,可是pred節點日後,一直到當前節點Node都處於被Cancel的狀態 Node predNext = pred.next; //將當前節點的waitStatus的狀態設爲Node.CANCELLED node.waitStatus = Node.CANCELLED; // 若是當前節點是尾節點,則將以前找到的節點pred從新設置成尾節點,並將pred節點的next屬性由predNext修改爲Null // 這一段本質上是將pred節點後面的節點所有移出隊列,由於它們都被cancel掉了 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // 到這裏說明當前節點已經不是尾節點了,或者設置新的尾節點失敗了 // 咱們前面說過,併發條件下,什麼都有可能發生 // 即在當前線程運行這段代碼的過程當中,其餘線程可能已經入隊了,成爲了新的尾節點 // 雖然咱們以前已經將當前節點的waitStatus設爲了CANCELLED // 可是由咱們在分析lock方法的文章可知,新的節點入隊後會設置鬧鐘,將找一個沒有CANCEL的前驅節點,將它的status設置成SIGNAL以喚醒本身。 // 因此,在當前節點的後繼節點入隊後,可能將當前節點的waitStatus修改爲了SIGNAL // 而在這時,咱們發起了中斷,又將這個waitStatus修改爲CANCELLED // 因此在當前節點出隊前,要負責喚醒後繼節點。 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
這個cancelAcquire
方法不只是取消了當前節點的排隊,還會同時將當前節點以前的那些已經CANCEL掉的節點移出隊列。不過這裏尤爲須要注意的是,這裏是在併發條件下,此時此刻,新的節點可能已經入隊了,成爲了新的尾節點,這將會致使node == tail && compareAndSetTail(node, pred)
這一條件失敗。
這個函數的前半部分是就是基於當前節點就是隊列的尾節點的,即在執行這個函數時,沒有新的節點入隊,這部分的邏輯比較簡單,你們直接看代碼中的註釋解釋便可。
然後半部分是基於有新的節點加進來,當前節點已經再也不是尾節點的狀況,咱們詳細看看這else部分:
if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); //將pred節點的後繼節點改成當前節點的後繼節點 } else { unparkSuccessor(node); } node.next = node; // help GC }
(這裏再說明一下pred
變量所表明的含義:它表示了從當前節點向前遍歷所找到的第一個沒有被cancel的節點。)
執行到else代碼塊,則咱們目前的情況以下:
在這種狀況下,咱們將執行if語句,將pred節點的後繼節點改成當前節點的後繼節點(compareAndSetNext(pred, predNext, next)
),即將從pred節點開始(不包含pred節點)一直到當前節點(包括當前節點)之間的全部節點所有移出隊列,由於他們都是被cancel的節點。固然這是基於必定條件的,條件爲:
上面這三個條件保證了pred節點確實是一個正在正常等待鎖的線程,而且它的waitStatus屬性爲SIGNAL。
若是這一條件沒法被知足,那麼咱們將直接經過unparkSuccessor喚醒它的後繼節點。
到這裏,咱們總結一下cancelAcquire
方法:
有的同窗就要問了,那第3條只是把當前節點的後繼節點喚醒了,並無將當前節點移除隊列呀?可是當前節點已經取消排隊了,不是應該移除隊列嗎?
彆着急,在後繼節點被喚醒後,它會在搶鎖時調用的shouldParkAfterFailedAcquire
方法裏面跳過已經CANCEL的節點,那個時候,當前節點就會被移出隊列了。
因爲tryLock僅僅是用於檢查鎖在當前調用的時候是否是可得到的,因此即便如今使用的是非公平鎖,在調用這個方法時,當前線程也會直接嘗試去獲取鎖,哪怕這個時候隊列中還有在等待中的線程。因此這一方法對於公平鎖和非公平鎖的實現是同樣的,它被定義在Sync類中,由FairSync和NonfairSync直接繼承使用:
public boolean tryLock() { return sync.nonfairTryAcquire(1); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
這個nonfairTryAcquire
咱們在上面分析非公平鎖的lock方法時已經講過了,這裏只是簡單的方法複用。該方法不存在任何和隊列相關的操做,僅僅就是直接嘗試去獲鎖,成功了就返回true,失敗了就返回false。
可能你們會以爲公平鎖也使用這種方式去tryLock就喪失了公平性,可是這種方式在某些狀況下是很是有用的,若是你仍是想維持公平性,那應該使用帶超時機制的tryLock
:
與當即返回的tryLock()
不一樣,tryLock(long timeout, TimeUnit unit)
帶了超時時間,因此是阻塞式的,而且在獲取鎖的過程當中能夠響應中斷異常:
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
與lockInterruptibly
方法同樣,該方法首先檢查當前線程是否已經被中斷過了,若是已經被中斷了,則當即拋出InterruptedException
。
隨後咱們經過調用tryAcquire
和doAcquireNanos(arg, nanosTimeout)
方法來嘗試獲取鎖,注意,這時公平鎖和非公平鎖對於tryAcquire
方法就有不一樣的實現了,公平鎖首先會檢查當前有沒有別的線程在隊列中排隊,關於公平鎖和非公平鎖對tryAcquire
的不一樣實現上文已經講過了,這裏再也不贅述。咱們直接來看doAcquireNanos
,這個方法其實和前面說的doAcquireInterruptibly
方法很像,咱們經過將相同的部分註釋掉,直接看不一樣的部分:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; /*final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false;*/ return true; // doAcquireInterruptibly中爲 return /*}*/ nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); /* } } finally { if (failed) cancelAcquire(node); }*/ }
能夠看出,這兩個方法的邏輯大差不差,只是doAcquireNanos
多了對於截止時間的檢查。
不過這裏有兩點須要注意,一個是doAcquireInterruptibly
是沒有返回值的,而doAcquireNanos
是有返回值的。這是由於doAcquireNanos
有可能由於獲取到鎖而返回,也有可能由於超時時間到了而返回,爲了區分這兩種狀況,由於超時時間而返回時,咱們將返回false,表明並無獲取到鎖。
另一點值得注意的是,上面有一個nanosTimeout > spinForTimeoutThreshold
的條件,在它知足的時候纔會將當前線程掛起指定的時間,這個spinForTimeoutThreshold是個啥呢:
/** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices * to improve responsiveness with very short timeouts. */ static final long spinForTimeoutThreshold = 1000L;
它就是個閾值,是爲了提高性能用的。若是當前剩下的等待時間已經很短了,咱們就直接使用自旋的形式等待,而不是將線程掛起,可見做者爲了儘量地優化AQS鎖的性能費足了心思。
unlock操做用於釋放當前線程所佔用的鎖,這一點對於公平鎖和非公平鎖的實現是同樣的,因此該方法被定義在Sync類中,由FairSync和NonfairSync直接繼承使用:
public void unlock() { sync.release(1); }
關於ReentrantLock的釋放鎖的操做,咱們在逐行分析AQS源碼(2)——獨佔鎖的釋放中已經詳細的介紹過了,這裏就再也不贅述了。
ReentrantLock自己並無實現Condition方法,它是直接調用了AQS的newCondition
方法
public Condition newCondition() { return sync.newCondition(); }
而AQS的newCondtion
方法就是簡單地建立了一個ConditionObject
對象:
final ConditionObject newCondition() { return new ConditionObject(); }
關於ConditionObject
對象的源碼分析,請參見 逐行分析AQS源碼(4)——Condition接口實現
ReentrantLock對於Lock接口方法的實現大多數是直接調用了AQS的方法,AQS中已經完成了大多數邏輯的實現,子類只須要直接繼承使用便可,這足見AQS在併發編程中的地位。固然,有一些邏輯仍是須要ReentrantLock本身去實現的,例如tryAcquire的邏輯。
AQS在併發編程中的地位舉足輕重,只要弄懂了它,咱們在學習其餘併發編程工具的時候就會容易不少。
(完)