系列傳送門:java
ReentrantLock
位於java.util.concurrent(J.U.C)
包下,是Lock接口的實現類。基本用法與synchronized
類似,都具有可重入互斥的特性,但擁有更強大的且靈活的鎖機制。本篇主要從源碼角度解析ReentrantLock,一些基本的概念以及Lock接口能夠戳這篇:Java併發讀書筆記:Lock與ReentrantLock編程
ReentrantLock推薦用法以下:c#
class X { //定義鎖對象 private final ReentrantLock lock = new ReentrantLock(); // ... //定義須要保證線程安全的方法 public void m() { //加鎖 lock.lock(); try{ // 保證線程安全的代碼 } // 使用finally塊保證釋放鎖 finally { lock.unlock() } } }
獨佔鎖表示:同時只能有一個線程能夠獲取該鎖,其餘獲取該鎖的線程會被阻塞而被放入該所的AQS阻塞隊列裏面。這部分能夠查看:Java併發包源碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別數組
Sync直接繼承自AQS,NonfairSync和FairSync繼承了Sync,實現了獲取鎖的公平與非公平策略。安全
ReentrantLock中的操做都是委託給Sync對象來實際操做的。併發
/** Synchronizer providing all implementation mechanics */ private final Sync sync;
默認是使用非公平鎖:NonfairSync
,能夠傳入參數來指定是否使用公平鎖。函數
// 默認使用的是 非公平的策略 public ReentrantLock() { sync = new NonfairSync(); } // 經過fair參數指定 策略 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
在ReentrantLock中,AQS的state狀態值表示線程獲取該鎖的可重入次數,在默認狀況下:學習
ReentrantLock的lock()方法委託給了sync類,根據建立sync的具體實現決定具體的邏輯:ui
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { // CAS 設置獲取state值 if (compareAndSetState(0, 1)) // 將當前線程設置爲鎖的持有者 setExclusiveOwnerThread(Thread.currentThread()); else // 設置失敗, 調用AQS的acquire方法 acquire(1); }
state值的初始狀態爲0,也就是說,第一個線程的CAS操做會成功將0設置爲1,表示當前線程獲取到了鎖,而後經過setExclusiveOwnerThread
方法將當前線程設置爲鎖的持有者。this
若是這時,其餘線程也試圖獲取該鎖,則CAS失敗,走到acquire的邏輯。
// AQS#acquire public final void acquire(int arg) { // 調用ReentrantLock重寫的tryAcquire方法 if (!tryAcquire(arg) && // tryAcquire方法返回false,則把當前線程放入AQS阻塞隊列中 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
欸,這個時候咱們應該就有感受了,咱們以前在分析AQS的核心方法的時候說到過,AQS是基於模板模式設計的,下面的tryAcquire方法就是留給子類實現的,而NonfairSync中是這樣實現的:
//NonfairSync#tryAcquire protected final boolean tryAcquire(int acquires) { // 調用 return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 獲取當前狀態值 int c = getState(); // 若是當前狀態值爲0,若是爲0表示當前鎖空閒 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 看看當前的線程是否是鎖的持有者 else if (current == getExclusiveOwnerThread()) { // 若是是的話 將狀態設置爲 c + acquires int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
仍是很好理解的哈,先看看鎖的狀態值是啥?
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法,被置入AQS阻塞隊列中。這裏非公平體如今獲取鎖的時候,沒有查看當前AQS隊列中是否有比本身更早請求該鎖的線程存在,而是採起了搶奪策略。
公平鎖的tryAcquire實現以下:
//FairSync#tryAcquire protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 狀態值爲0的時候,我去看看隊列裏面在我以前有沒有線程在等 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 當前鎖已經被當前線程持有 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
對比一下兩種策略,沒必要說,hasQueuedPredecessors
方法必定是實現公平性的核心,咱們來瞅瞅:
// 若是當前線程有前驅節點就返回true。 public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
該方法:若是當前線程有前驅節點就返回true,那麼咱們想,不是前驅節點的狀況有哪些呢?
知道這些以後,咱們就明白最後那串表達式是什麼意思了:隊列裏面的第一個元素不是當前線程,返回true,說明在你以前還有人排着隊呢,你先別搶,先到先得。
咱們稍微總結一下:
Reentrant類的構造函數接受一個可選的公平性參數fair。這時候就出現兩種選擇:
公平鎖每每體現出的整體吞吐量比非公平鎖要低,也就是更慢,由於每次都須要看看隊列裏面有沒有在排隊的嘛。鎖的公平性並不保證線程調度的公平性,但公平鎖可以減小"飢餓"發生的機率。
須要注意的是:不定時的tryLock()方法不支持公平性設置。若是鎖可用,即便其餘線程等待時間比它長,它也會成功得到鎖。
該方法與lock方法相似,不一樣點在於,它能對中斷進行相應:當前線程在調用該方法時,若是其餘線程調用了當前線程的interrupt()方法,當前線程會拋出InterruptedException異常,而後返回。
// ReentrantLock#lockInterruptibly public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } // AQS#acquireInterruptibly public final void acquireInterruptibly(int arg) throws InterruptedException { // 若是當前線程被中斷,則直接拋出異常 if (Thread.interrupted()) throw new InterruptedException(); // 嘗試獲取資源 if (!tryAcquire(arg)) // 調用AQS可被中斷的方法 doAcquireInterruptibly(arg); }
嘗試獲取鎖,若是當前該鎖沒有被其餘線程持有,則當前線程獲取該鎖並返回true,不然返回false。
大體邏輯和非公平鎖lock方法相似,但該方法會直接返回獲取鎖的結果,不管true或者false,它不會阻塞。
// ReentrantLock# tryLock public boolean tryLock() { return sync.nonfairTryAcquire(1); } abstract static class Sync extends AbstractQueuedSynchronizer { // Sync#nonfairTryAcquire final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
tryLock()
實現方法,在實現時,但願能快速的得到是否可以得到到鎖,所以即便在設置爲 fair = true
( 使用公平鎖 ),依然調用 Sync#nonfairTryAcquire(int acquires)
方法。tryLock()
仍是按照是否公平鎖的方式來,能夠調用 #tryLock(0, TimeUnit)
方法來實現。// ReentrantLock# tryLock public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } // AQS#tryAcquireNanos public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
嘗試獲取鎖,若是獲取失敗會將當前線程掛起指定時間,時間到了以後當前線程被激活,若是仍是沒有獲取到鎖,就返回false。
另外,該方法會對中斷進行的響應,若是其餘線程調用了當前線程的interrupt()方法,響應中斷,拋出異常。
// ReentrantLock#unlock public void unlock() { sync.release(1); } //AQS# release public final boolean release(int arg) { // 子類實現tryRelease if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } abstract static class Sync extends AbstractQueuedSynchronizer { // Sync#tryRelease protected final boolean tryRelease(int releases) { // 計算解鎖後的次數,默認減1 int c = getState() - releases; // 若是想要解鎖的人不是當前的鎖持有者,直接拋異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 可重入次數爲0,清空鎖持有線程 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 可重入次數還沒到0,只須要改變一下下state就可 setState(c); return free; } }
嘗試釋放鎖,若是當前線程持有該鎖,調用該方法默認會讓AQS的state減1。
若是減1以後,state爲0,當前線程會釋放鎖。
若是當前線程不是鎖持有者而企圖調用該方法,則拋出IllegalMonitorStateException異常。
Condition是用來代替傳統Object中的wait()和notify()實現線程間的協做,Condition的await()和signal()用於處理線程間協做更加安全與高效。
Condition的使用必須在lock()與unlock()之間使用,且只能經過lock.newCondition()獲取,實現原理咱們以後會專門進行學習。
public class BlockingQueue { final Object[] items; // 緩衝數組 final ReentrantLock lock = new ReentrantLock(); // 非公平獨佔鎖 final Condition notFull = lock.newCondition(); // 未滿條件 final Condition notEmpty = lock.newCondition(); // 未空條件 private int putIdx; // 添加操做的指針 private int takeIdx; // 獲取操做的指針 private int count; // 隊列中元素個數 public BlockingQueue(int capacity) { if(capacity < 0) throw new IllegalArgumentException(); items = new Object[capacity]; } // 插入 public void put(Object item) throws InterruptedException { try { lock.lock(); // 上鎖 while (items.length == count) { // 滿了 notFull.await(); // 其餘插入線程阻塞起來 } enqueue(item); // 沒滿就能夠入隊 } finally { lock.unlock(); // 不要忘記解鎖 } } private void enqueue(Object item) { items[putIdx] = item; if (++putIdx == items.length) putIdx = 0; count++; notEmpty.signal(); // 叫醒獲取的線程 } // 獲取 public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await();// 阻塞其餘獲取線程 } return dequeue(); } finally { lock.unlock(); } } private Object dequeue() { Object x = items[takeIdx]; items[takeIdx] = null; if (++takeIdx == items.length) takeIdx = 0; count--; notFull.signal(); // 叫醒其餘的插入線程 return x; } }
其實上面就是ArrayBlockingQueue刪減版的部分實現,感興趣的小夥伴能夠看看源碼的實現,源碼上面針對併發還作了更細節的處理。
API層面的獨佔鎖:ReentrantLock是底層使用AQS實現的可重入的獨佔鎖,區別於synchronized原生語法層面實現鎖語義,ReetrantLock經過lock()
和unlock()
兩個方法顯式地實現互斥鎖。
state與可重入:AQS的state爲0表示當前鎖空閒,大於0表示該鎖已經被佔用,某一時刻只有一個線程能夠獲取該鎖。可重入性是經過判斷持鎖線程是否是當前線程,若是是,state+1,釋放鎖時,state-1,爲0時表示完全釋放。
公平與非公平策略:ReentrantLock擁有公平和非公平兩種策略,區別在於獲取鎖的時候是否會去檢查阻塞隊列中,是否存在當前線程的前驅節點,默認是非公平鎖策略。
豐富的鎖擴展:提供了響應中斷的獲取鎖方式lockInterruptibly,以及提供了快速響應的tryLock方法,及超時獲取等等方法。
condition:TODO一個ReentrantLock對象能夠經過newCondition()同時綁定多個Condition對象,對線程的等待、喚醒操做更加詳細和靈活,這一點咱們以後說到Condition的時候會再回過頭說的。