| 好看請贊,養成習慣html
- 你有一個思想,我有一個思想,咱們交換後,一我的就有兩個思想
- If you can NOT explain it simply, you do NOT understand it well enough
現陸續將Demo代碼和技術文章整理在一塊兒 Github實踐精選 ,方便你們閱讀查看,本文一樣收錄在此,以爲不錯,還請Starjava
進入源碼階段了,寫了十幾篇的 併發系列 知識鋪墊終於要派上用場了。相信不少人已經忘了其中的一些理論知識,別擔憂,我會在源碼環節帶入相應的理論知識點幫助你們回憶,作到理論與實踐相結合,另外這是超長圖文,建議收藏,若是對你有用還請點贊讓更多人看到node
曾幾什麼時候幻想過,若是 Java 併發控制只有 synchronized 多好,只有下面三種使用方式,簡單方便git
public class ThreeSync { private static final Object object = new Object(); public synchronized void normalSyncMethod(){ //臨界區 } public static synchronized void staticSyncMethod(){ //臨界區 } public void syncBlockMethod(){ synchronized (object){ //臨界區 } } }
若是在 Java 1.5以前,確實是這樣,自從 1.5 版本 Doug Lea 大師就從新造了一個輪子 Lock程序員
咱們常說:「避免重複造輪子」,若是有了輪子仍是要堅持再造個輪子,那麼確定傳統的輪子在某些應用場景中不能很好的解決問題github
不知你是否還記得 Coffman 總結的四個能夠發生死鎖的情形 ,其中【不可剝奪條件】是指:面試
線程已經得到資源,在未使用完以前,不能被剝奪,只能在使用完時本身釋放
要想破壞這個條件,就須要具備申請不到進一步資源就釋放已有資源的能力 算法
很顯然,這個能力是 synchronized 不具有的,使用 synchronized ,若是線程申請不到資源就會進入阻塞狀態,咱們作什麼也改變不了它的狀態,這是 synchronized 輪子的致命弱點,這就強有力的給了重造輪子 Lock 的理由編程
舊輪子有弱點,新輪子就要解決這些問題,因此要具有不會阻塞的功能,下面的三個方案都是解決這個問題的好辦法(看下面表格描述你就明白三個方案的含義了)設計模式
特性 | 描述 | API |
---|---|---|
能響應中斷 | 若是不能本身釋放,那能夠響應中斷也是很好的。Java多線程中斷機制 專門描述了中斷過程,目的是經過中斷信號來跳出某種狀態,好比阻塞 | lockInterruptbly() |
非阻塞式的獲取鎖 | 嘗試獲取,獲取不到不會阻塞,直接返回 | tryLock() |
支持超時 | 給定一個時間限制,若是一段時間內沒獲取到,不是進入阻塞狀態,一樣直接返回 | tryLock(long time, timeUnit) |
好的方案有了,但魚和熊掌不可兼得,Lock 多了 synchronized 不具有的特性,天然不會像 synchronized 那樣一個關鍵字三個玩法走遍全天下,在使用上也相對複雜了一丟丟
synchronized 有標準用法,這樣的優良傳統咱 Lock 也得有,相信不少人都知道使用 Lock 的一個範式
Lock lock = new ReentrantLock(); lock.lock(); try{ ... }finally{ lock.unlock(); }
既然是範式(沒事不要挑戰更改寫法的那種),確定有其理由,咱們來看一下
這個你們應該都會明白,在 finally 中釋放鎖,目的是保證在獲取到鎖以後,最終能被釋放
不知道你有沒有想過,爲何會有標準 2 的存在,咱們一般是「喜歡」 try 住全部內容,生怕發生異常不能捕獲的
在 try{}
外獲取鎖主要考慮兩個方面:
不一樣鎖的實現方式略有不一樣,範式的存在就是要避免一切問題的出現,因此你們儘可能遵照範式
若是你熟悉 synchronized,你知道程序編譯成 CPU 指令後,在臨界區會有 moniterenter
和 moniterexit
指令的出現,能夠理解成進出臨界區的標識
從範式上來看:
lock.lock()
獲取鎖,「等同於」 synchronized 的 moniterenter指令lock.unlock()
釋放鎖,「等同於」 synchronized 的 moniterexit 指令那 Lock 是怎麼作到的呢?
這裏先簡單說明一下,這樣一會到源碼分析時,你能夠遠觀設計輪廓,近觀實現細節,會變得愈加輕鬆
其實很簡單,好比在 ReentrantLock 內部維護了一個 volatile 修飾的變量 state,經過 CAS 來進行讀寫(最底層仍是交給硬件來保證原子性和可見性),若是CAS更改爲功,即獲取到鎖,線程進入到 try 代碼塊繼續執行;若是沒有更改爲功,線程會被【掛起】,不會向下執行
但 Lock 是一個接口,裏面根本沒有 state 這個變量的存在:
它怎麼處理這個 state 呢?很顯然須要一點設計的加成了,接口定義行爲,具體都是須要實現類的
Lock 接口的實現類基本都是經過【聚合】了一個【隊列同步器】的子類完成線程訪問控制的
那什麼是隊列同步器呢? (這應該是你見過的最強標題黨,聊了半個世紀才入正題,評論區留言罵我)
隊列同步器 (AbstractQueuedSynchronizer),簡稱同步器或AQS,就是咱們今天的主人公
問:爲何你分析 JUC 源碼,要從 AQS 提及呢?答:看下圖
相信看到這個截圖你就明白一二了,你聽過的,面試常被問起的,工做中經常使用的
ReentrantLock
ReentrantReadWriteLock
Semaphore(信號量)
CountDownLatch
公平鎖
非公平鎖
ThreadPoolExecutor
(關於線程池的理解,能夠查看 爲何要使用線程池? )都和 AQS 有直接關係,因此瞭解 AQS 的抽象實現,在此基礎上再稍稍查看上述各種的實現細節,很快就能夠所有搞定,不至於查看源碼時一頭霧水,丟失主線
上面提到,在鎖的實現類中會聚合同步器,而後利同步器實現鎖的語義,那麼問題來了:
爲何要用聚合模式,怎麼進一步理解鎖和同步器的關係呢?
咱們絕大多數都是在使用鎖,實現鎖以後,其核心就是要使用方便
從 AQS 的類名稱和修飾上來看,這是一個抽象類,因此從設計模式的角度來看同步器必定是基於【模版模式】來設計的,使用者須要繼承同步器,實現自定義同步器,並重寫指定方法,隨後將同步器組合在自定義的同步組件中,並調用同步器的模版方法,而這些模版方法又回調用使用者重寫的方法
我不想將上面的解釋說的這麼抽象,其實想理解上面這句話,咱們只須要知道下面兩個問題就行了
同步器提供的可重寫方法只有5個,這大大方便了鎖的使用者:
按理說,須要重寫的方法也應該有 abstract 來修飾的,爲何這裏沒有?緣由其實很簡單,上面的方法我已經用顏色區分紅了兩類:
獨佔式
共享式
自定義的同步組件或者鎖不可能既是獨佔式又是共享式,爲了不強制重寫不相干方法,因此就沒有 abstract 來修飾了,但要拋出異常告知不能直接使用該方法:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
暖暖的很貼心(若是你有相似的需求也能夠仿照這樣的設計)
表格方法描述中所說的同步狀態
就是上文提到的有 volatile 修飾的 state,因此咱們在重寫
上面幾個方法時,還要經過同步器提供的下面三個方法(AQS 提供的)來獲取或修改同步狀態:
而獨佔式和共享式操做 state 變量的區別也就很簡單了
因此你看到的 ReentrantLock
ReentrantReadWriteLock
Semaphore(信號量)
CountDownLatch
這幾個類其實僅僅是在實現以上幾個方法上略有差異,其餘的實現都是經過同步器的模版方法來實現的,到這裏是否是心情放鬆了許多呢?咱們來看一看模版方法:
上面咱們將同步器的實現方法分爲獨佔式和共享式兩類,模版方法其實除了提供以上兩類模版方法以外,只是多了響應中斷
和超時限制
的模版方法供 Lock 使用,來看一下
先不用記上述方法的功能,目前你只須要了解個大概功能就好。另外,相信你也注意到了:
上面的方法都有 final 關鍵字修飾,說明子類不能重寫這個方法
看到這你也許有點亂了,咱們稍微概括一下:
程序員仍是看代碼內心踏實一點,咱們再來用代碼說明一下上面的關係(注意代碼中的註釋,如下的代碼並非很嚴謹,只是爲了簡單說明上圖的代碼實現):
package top.dayarch.myjuc; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * 自定義互斥鎖 * * @author tanrgyb * @date 2020/5/23 9:33 PM */ public class MyMutex implements Lock { // 靜態內部類-自定義同步器 private static class MySync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { // 調用AQS提供的方法,經過CAS保證原子性 if (compareAndSetState(0, arg)){ // 咱們實現的是互斥鎖,因此標記獲取到同步狀態(更新state成功)的線程, // 主要爲了判斷是否可重入(一下子會說明) setExclusiveOwnerThread(Thread.currentThread()); //獲取同步狀態成功,返回 true return true; } // 獲取同步狀態失敗,返回 false return false; } @Override protected boolean tryRelease(int arg) { // 未擁有鎖卻讓釋放,會拋出IMSE if (getState() == 0){ throw new IllegalMonitorStateException(); } // 能夠釋放,清空排它線程標記 setExclusiveOwnerThread(null); // 設置同步狀態爲0,表示釋放鎖 setState(0); return true; } // 是否獨佔式持有 @Override protected boolean isHeldExclusively() { return getState() == 1; } // 後續會用到,主要用於等待/通知機制,每一個condition都有一個與之對應的條件等待隊列,在鎖模型中說明過 Condition newCondition() { return new ConditionObject(); } } // 聚合自定義同步器 private final MySync sync = new MySync(); @Override public void lock() { // 阻塞式的獲取鎖,調用同步器模版方法獨佔式,獲取同步狀態 sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { // 調用同步器模版方法可中斷式獲取同步狀態 sync.acquireInterruptibly(1); } @Override public boolean tryLock() { // 調用本身重寫的方法,非阻塞式的獲取同步狀態 return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { // 調用同步器模版方法,可響應中斷和超時時間限制 return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { // 釋放鎖 sync.release(1); } @Override public Condition newCondition() { // 使用自定義的條件 return sync.newCondition(); } }
若是你如今打開 IDE, 你會發現上文提到的 ReentrantLock
ReentrantReadWriteLock
Semaphore(信號量)
CountDownLatch
都是按照這個結構實現,因此咱們就來看一看 AQS 的模版方法究竟是怎麼實現鎖
從上面的代碼中,你應該理解了lock.tryLock()
非阻塞式獲取鎖就是調用自定義同步器重寫的 tryAcquire()
方法,經過 CAS 設置state 狀態,無論成功與否都會立刻返回;那麼 lock.lock() 這種阻塞式的鎖是如何實現的呢?
有阻塞就須要排隊,實現排隊必然須要隊列
CLH:Craig、Landin and Hagersten 隊列,是一個單向鏈表,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO)——概念瞭解就好,不要記
隊列中每一個排隊的個體就是一個 Node,因此咱們來看一下 Node 的結構
AQS 內部維護了一個同步隊列,用於管理同步狀態。
爲了將上述步驟弄清楚,咱們須要來看一看 Node 結構 (若是你能打開 IDE 一塊兒看那是極好的)
乍一看有點雜亂,咱們仍是將其歸類說明一下:
上面這幾個狀態說明有個印象就好,有了Node 的結構說明鋪墊,你也就能想象同步隊列的接本結構了:
前置知識基本鋪墊完畢,咱們來看一看獨佔式獲取同步狀態的整個過程
故事要從範式lock.lock() 開始
public void lock() { // 阻塞式的獲取鎖,調用同步器模版方法,獲取同步狀態 sync.acquire(1); }
進入AQS的模版方法 acquire()
public final void acquire(int arg) { // 調用自定義同步器重寫的 tryAcquire 方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
首先,也會嘗試非阻塞的獲取同步狀態,若是獲取失敗(tryAcquire返回false),則會調用 addWaiter
方法構造 Node 節點(Node.EXCLUSIVE 獨佔式)並安全的(CAS)加入到同步隊列【尾部】
private Node addWaiter(Node mode) { // 構造Node節點,包含當前線程信息以及節點模式【獨佔/共享】 Node node = new Node(Thread.currentThread(), mode); // 新建變量 pred 將指針指向tail指向的節點 Node pred = tail; // 若是尾節點不爲空 if (pred != null) { // 新加入的節點前驅節點指向尾節點 node.prev = pred; // 由於若是多個線程同時獲取同步狀態失敗都會執行這段代碼 // 因此,經過 CAS 方式確保安全的設置當前節點爲最新的尾節點 if (compareAndSetTail(pred, node)) { // 曾經的尾節點的後繼節點指向當前節點 pred.next = node; // 返回新構建的節點 return node; } } // 尾節點爲空,說明當前節點是第一個被加入到同步隊列中的節點 // 須要一個入隊操做 enq(node); return node; } private Node enq(final Node node) { // 經過「死循環」確保節點被正確添加,最終將其設置爲尾節點以後纔會返回,這裏使用 CAS 的理由和上面同樣 for (;;) { Node t = tail; // 第一次循環,若是尾節點爲 null if (t == null) { // Must initialize // 構建一個哨兵節點,並將頭部指針指向它 if (compareAndSetHead(new Node())) // 尾部指針一樣指向哨兵節點 tail = head; } else { // 第二次循環,將新節點的前驅節點指向t node.prev = t; // 將新節點加入到隊列尾節點 if (compareAndSetTail(t, node)) { // 前驅節點的後繼節點指向當前新節點,完成雙向隊列 t.next = node; return t; } } } }
你可能比較迷惑 enq() 的處理方式,進入該方法就是一個「死循環」,咱們就用圖來描述它是怎樣跳出循環的
有些同窗可能會有疑問,爲何會有哨兵節點?
哨兵,顧名思義,是用來解決國家之間邊界問題的,不直接參與生產活動。一樣,計算機科學中提到的哨兵,也用來解決邊界問題,若是沒有邊界,指定環節,按照一樣算法可能會在邊界處發生異常,好比要繼續向下分析的
acquireQueued()
方法
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // "死循環",嘗試獲取鎖,或者掛起 for (;;) { // 獲取當前節點的前驅節點 final Node p = node.predecessor(); // 只有當前節點的前驅節點是頭節點,纔會嘗試獲取鎖 // 看到這你應該理解添加哨兵節點的含義了吧 if (p == head && tryAcquire(arg)) { // 獲取同步狀態成功,將本身設置爲頭 setHead(node); // 將哨兵節點的後繼節點置爲空,方便GC p.next = null; // help GC failed = false; // 返回中斷標識 return interrupted; } // 當前節點的前驅節點不是頭節點 //【或者】當前節點的前驅節點是頭節點但獲取同步狀態失敗 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
獲取同步狀態成功會返回能夠理解了,可是若是失敗就會一直陷入到「死循環」中浪費資源嗎?很顯然不是,shouldParkAfterFailedAcquire(p, node)
和 parkAndCheckInterrupt()
就會將線程獲取同步狀態失敗的線程掛起,咱們繼續向下看
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 獲取前驅節點的狀態 int ws = pred.waitStatus; // 若是是 SIGNAL 狀態,即等待被佔用的資源釋放,直接返回 true // 準備繼續調用 parkAndCheckInterrupt 方法 if (ws == Node.SIGNAL) return true; // ws 大於0說明是CANCELLED狀態, if (ws > 0) { // 循環判斷前驅節點的前驅節點是否也爲CANCELLED狀態,忽略該狀態的節點,從新鏈接隊列 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 將當前節點的前驅節點設置爲設置爲 SIGNAL 狀態,用於後續喚醒操做 // 程序第一次執行到這返回爲false,還會進行外層第二次循環,最終從代碼第7行返回 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
到這裏你也許有個問題:
這個地方設置前驅節點爲 SIGNAL 狀態到底有什麼做用?
保留這個問題,咱們陸續揭曉
若是前驅節點的 waitStatus 是 SIGNAL狀態,即 shouldParkAfterFailedAcquire 方法會返回 true ,程序會繼續向下執行 parkAndCheckInterrupt
方法,用於將當前線程掛起
private final boolean parkAndCheckInterrupt() { // 線程掛起,程序不會繼續向下執行 LockSupport.park(this); // 根據 park 方法 API描述,程序在下述三種狀況會繼續向下執行 // 1. 被 unpark // 2. 被中斷(interrupt) // 3. 其餘不合邏輯的返回纔會繼續向下執行 // 因上述三種狀況程序執行至此,返回當前線程的中斷狀態,並清空中斷狀態 // 若是因爲被中斷,該方法會返回 true return Thread.interrupted(); }
被喚醒的程序會繼續執行 acquireQueued
方法裏的循環,若是獲取同步狀態成功,則會返回 interrupted = true
的結果
程序繼續向調用棧上層返回,最終回到 AQS 的模版方法 acquire
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
你也許會有疑惑:
程序已經成功獲取到同步狀態並返回了,怎麼會有個自我中斷呢?
static void selfInterrupt() { Thread.currentThread().interrupt(); }
若是你不能理解中斷,強烈建議你回看 Java多線程中斷機制
到這裏關於獲取同步狀態咱們還遺漏了一條線,acquireQueued 的 finally 代碼塊若是你仔細看你也許立刻就會有疑惑:
到底什麼狀況纔會執行 if(failed) 裏面的代碼 ?
if (failed) cancelAcquire(node);
這段代碼被執行的條件是 failed 爲 true,正常狀況下,若是跳出循環,failed 的值爲false,若是不能跳出循環貌似怎麼也不能執行到這裏,因此只有不正常的狀況纔會執行到這裏,也就是會發生異常,纔會執行到此處
查看 try 代碼塊,只有兩個方法會拋出異常:
node.processor()
方法tryAcquire()
方法先看前者:
很顯然,這裏拋出的異常不是重點,那就以 ReentrantLock 重寫的 tryAcquire() 方法爲例
另外,上面分析 shouldParkAfterFailedAcquire
方法還對 CANCELLED 的狀態進行了判斷,那麼
何時會生成取消狀態的節點呢?
答案就在 cancelAcquire
方法中, 咱們來看看 cancelAcquire到底怎麼設置/處理 CANNELLED 的
private void cancelAcquire(Node node) { // 忽略無效節點 if (node == null) return; // 將關聯的線程信息清空 node.thread = null; // 跳過一樣是取消狀態的前驅節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 跳出上面循環後找到前驅有效節點,並獲取該有效節點的後繼節點 Node predNext = pred.next; // 將當前節點的狀態置爲 CANCELLED node.waitStatus = Node.CANCELLED; // 若是當前節點處在尾節點,直接從隊列中刪除本身就好 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; // 1. 若是當前節點的有效前驅節點不是頭節點,也就是說當前節點不是頭節點的後繼節點 if (pred != head && // 2. 判斷當前節點有效前驅節點的狀態是否爲 SIGNAL ((ws = pred.waitStatus) == Node.SIGNAL || // 3. 若是不是,嘗試將前驅節點的狀態置爲 SIGNAL (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 判斷當前節點有效前驅節點的線程信息是否爲空 pred.thread != null) { // 上述條件知足 Node next = node.next; // 將當前節點有效前驅節點的後繼節點指針指向當前節點的後繼節點 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 若是當前節點的前驅節點是頭節點,或者上述其餘條件不知足,就喚醒當前節點的後繼節點 unparkSuccessor(node); } node.next = node; // help GC }
看到這個註釋你可能有些亂了,其核心目的就是從等待隊列中移除 CANCELLED 的節點,並從新拼接整個隊列,總結來看,其實設置 CANCELLED 狀態節點只是有三種狀況,咱們經過畫圖來分析一下:
至此,獲取同步狀態的過程就結束了,咱們簡單的用流程圖說明一下整個過程
獲取鎖的過程就這樣的結束了,先暫停幾分鐘整理一下本身的思路。咱們上面尚未說明 SIGNAL 的做用, SIGNAL 狀態信號究竟是幹什麼用的?這就涉及到鎖的釋放了,咱們來繼續瞭解,總體思路和鎖的獲取是同樣的, 可是釋放過程就相對簡單不少了
故事要從 unlock() 方法提及
public void unlock() { // 釋放鎖 sync.release(1); }
調用 AQS 模版方法 release,進入該方法
public final boolean release(int arg) { // 調用自定義同步器重寫的 tryRelease 方法嘗試釋放同步狀態 if (tryRelease(arg)) { // 釋放成功,獲取頭節點 Node h = head; // 存在頭節點,而且waitStatus不是初始狀態 // 經過獲取的過程咱們已經分析了,在獲取的過程當中會將 waitStatus的值從初始狀態更新成 SIGNAL 狀態 if (h != null && h.waitStatus != 0) // 解除線程掛起狀態 unparkSuccessor(h); return true; } return false; }
查看 unparkSuccessor 方法,實際是要喚醒頭節點的後繼節點
private void unparkSuccessor(Node node) { // 獲取頭節點的waitStatus int ws = node.waitStatus; if (ws < 0) // 清空頭節點的waitStatus值,即置爲0 compareAndSetWaitStatus(node, ws, 0); // 獲取頭節點的後繼節點 Node s = node.next; // 判斷當前節點的後繼節點是不是取消狀態,若是是,須要移除,從新鏈接隊列 if (s == null || s.waitStatus > 0) { s = null; // 從尾節點向前查找,找到隊列第一個waitStatus狀態小於0的節點 for (Node t = tail; t != null && t != node; t = t.prev) // 若是是獨佔式,這裏小於0,其實就是 SIGNAL if (t.waitStatus <= 0) s = t; } if (s != null) // 解除線程掛起狀態 LockSupport.unpark(s.thread); }
有同窗可能有疑問:
爲何這個地方是從隊列尾部向前查找不是 CANCELLED 的節點?
緣由有兩個:
第一,先回看節點加入隊列的情景:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
節點入隊並非原子操做,代碼第六、7行
node.prev = pred; compareAndSetTail(pred, node)
這兩個地方能夠看做是尾節點入隊的原子操做,若是此時代碼還沒執行到 pred.next = node; 這時又恰巧執行了unparkSuccessor方法,就沒辦法從前日後找了,由於後繼指針尚未鏈接起來,因此須要從後往前找
第二點緣由,在上面圖解產生 CANCELLED 狀態節點的時候,先斷開的是 Next 指針,Prev指針並未斷開,所以這也是必需要從後往前遍歷纔可以遍歷徹底部的Node
同步狀態至此就已經成功釋放了,以前獲取同步狀態被掛起的線程就會被喚醒,繼續從下面代碼第 3 行返回執行:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
繼續返回上層調用棧, 從下面代碼15行開始執行,從新執行循環,再次嘗試獲取同步狀態
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
到這裏,關於獨佔式獲取/釋放鎖的流程已經閉環了,可是關於 AQS 的另外兩個模版方法尚未介紹
響應中斷
超時限制
故事要從lock.lockInterruptibly() 方法提及
public void lockInterruptibly() throws InterruptedException { // 調用同步器模版方法可中斷式獲取同步狀態 sync.acquireInterruptibly(1); }
有了前面的理解,理解獨佔式可響應中斷的獲取同步狀態方式,真是一眼就能明白了:
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 嘗試非阻塞式獲取同步狀態失敗,若是沒有獲取到同步狀態,執行代碼7行 if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
繼續查看 doAcquireInterruptibly
方法:
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 獲取中斷信號後,再也不返回 interrupted = true 的值,而是直接拋出 InterruptedException throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
沒想到 JDK 內部也有如此相近的代碼,可響應中斷獲取鎖沒什麼深奧的,就是被中斷拋出 InterruptedException 異常(代碼第17行),這樣就逐層返回上層調用棧捕獲該異常進行下一步操做了
趁熱打鐵,來看看另一個模版方法:
這個很好理解,就是給定一個時限,在該時間段內獲取到同步狀態,就返回 true, 不然,返回 false。比如線程給本身定了一個鬧鐘,鬧鈴一響,線程就本身返回了,這就不會使本身是阻塞狀態了
既然涉及到超時限制,其核心邏輯確定是計算時間間隔,由於在超時時間內,確定是屢次嘗試獲取鎖的,每次獲取鎖確定有時間消耗,因此計算時間間隔的邏輯就像咱們在程序打印程序耗時 log 那麼簡單
nanosTimeout = deadline - System.nanoTime()
故事要從 lock.tryLock(time, unit)
方法提及
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { // 調用同步器模版方法,可響應中斷和超時時間限制 return sync.tryAcquireNanos(1, unit.toNanos(time)); }
來看 tryAcquireNanos 方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
是否是和上面 acquireInterruptibly
方法長相很詳細了,繼續查看來 doAcquireNanos 方法,看程序, 該方法也是 throws InterruptedException,咱們在中斷文章中說過,方法標記上有 throws InterruptedException
說明該方法也是能夠響應中斷的,因此你能夠理解超時限制是 acquireInterruptibly
方法的增強版,具備超時和非阻塞控制的雙保險
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 超時時間內,爲獲取到同步狀態,直接返回false if (nanosTimeout <= 0L) return false; // 計算超時截止時間 final long deadline = System.nanoTime() + nanosTimeout; // 以獨佔方式加入到同步隊列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 計算新的超時時間 nanosTimeout = deadline - System.nanoTime(); // 若是超時,直接返回 false if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && // 判斷是最新超時時間是否大於閾值 1000 nanosTimeout > spinForTimeoutThreshold) // 掛起線程 nanosTimeout 長時間,時間到,自動返回 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
上面的方法應該不是很難懂,可是又同窗可能在第 27 行上有所困惑
爲何 nanosTimeout 和 自旋超時閾值1000進行比較?
/** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices * to improve responsiveness with very short timeouts. */ static final long spinForTimeoutThreshold = 1000L;
其實 doc 說的很清楚,說白了,1000 nanoseconds 時間已經很是很是短暫了,不必再執行掛起和喚醒操做了,不如直接當前線程直接進入下一次循環
到這裏,咱們自定義的 MyMutex 只差 Condition 沒有說明了,不知道你累了嗎?我還在堅持
若是你看過以前寫的 併發編程之等待通知機制 ,你應該對下面這個圖是有印象的:
若是當時你理解了這個模型,再看 Condition 的實現,根本就不是問題了,首先 Condition 仍是一個接口,確定也是須要有實現類的
那故事就從 lock.newnewCondition
提及吧
public Condition newCondition() { // 使用自定義的條件 return sync.newCondition(); }
自定義同步器重封裝了該方法:
Condition newCondition() { return new ConditionObject(); }
ConditionObject 就是 Condition 的實現類,該類就定義在了 AQS 中,只有兩個成員變量:
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
因此,咱們只須要來看一下 ConditionObject 實現的 await / signal 方法來使用這兩個成員變量就能夠了
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 一樣構建 Node 節點,並加入到等待隊列中 Node node = addConditionWaiter(); // 釋放同步狀態 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 掛起當前線程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
這裏注意用詞,在介紹獲取同步狀態時,addWaiter 是加入到【同步隊列】,就是上圖說的入口等待隊列,這裏說的是【等待隊列】,因此 addConditionWaiter 確定是構建了一個本身的隊列:
private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 新構建的節點的 waitStatus 是 CONDITION,注意不是 0 或 SIGNAL 了 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 構建單向同步隊列 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
這裏有朋友可能會有疑問:
爲何這裏是單向隊列,也沒有使用CAS 來保證加入隊列的安全性呢?
由於 await 是 Lock 範式 try 中使用的,說明已經獲取到鎖了,因此就不必使用 CAS 了,至因而單向,由於這裏還不涉及到競爭鎖,只是作一個條件等待隊列
在 Lock 中能夠定義多個條件,每一個條件都會對應一個 條件等待隊列,因此將上圖豐富說明一下就變成了這個樣子:
線程已經按相應的條件加入到了條件等待隊列中,那如何再嘗試獲取鎖呢?signal / signalAll 方法就已經排上用場了
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
Signal 方法經過調用 doSignal 方法,只喚醒條件等待隊列中的第一個節點
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 調用該方法,將條件等待隊列的線程節點移動到同步隊列中 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
繼續看 transferForSignal
方法
final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 從新進行入隊操做 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 喚醒同步隊列中該線程 LockSupport.unpark(node.thread); return true; }
因此咱們再用圖解一下喚醒的整個過程
到這裏,理解 signalAll 就很是簡單了,只不過循環判斷是否還有 nextWaiter,若是有就像 signal 操做同樣,將其從條件等待隊列中移到同步隊列中
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
不知你仍是否記得,我在併發編程之等待通知機制 中還說過一句話
沒有特殊緣由儘可能用 signalAll 方法
何時能夠用 signal 方法也在其中作了說明,請你們自行查看吧
這裏我還要多說一個細節,從條件等待隊列移到同步隊列是有時間差的,因此使用 await() 方法也是範式的, 一樣在該文章中作了解釋
有時間差,就會有公平和不公平的問題,想要全面瞭解這個問題,咱們就要走近 ReentrantLock 中來看了,除了瞭解公平/不公平問題,查看 ReentrantLock 的應用仍是要反過來驗證它使用的AQS的,咱們繼續吧
獨佔式的典型應用就是 ReentrantLock 了,咱們來看看它是如何重寫這個方法的
乍一看挺奇怪的,怎麼裏面自定義了三個同步器:其實 NonfairSync,FairSync 只是對 Sync 作了進一步劃分:
從名稱上你應該也知道了,這就是你聽到過的 公平鎖/非公平鎖
了
生活中,排隊講求先來後到視爲公平。程序中的公平性也是符合請求鎖的絕對時間的,其實就是 FIFO,不然視爲不公平
咱們來對比一下 ReentrantLock 是如何實現公平鎖和非公平鎖的
其實沒什麼大不了,公平鎖就是判斷同步隊列是否還有先驅節點的存在,只有沒有先驅節點才能獲取鎖;而非公平鎖是無論這個事的,能獲取到同步狀態就能夠,就這麼簡單,那問題來了:
考慮這個問題,咱們需從新回憶上面的鎖獲取實現圖了,其實上面我已經透露了一點
主要有兩點緣由:
恢復掛起的線程到真正鎖的獲取仍是有時間差的,從人類的角度來看這個時間微乎其微,可是從CPU的角度來看,這個時間差存在的仍是很明顯的。因此非公平鎖能更充分的利用 CPU 的時間片,儘可能減小 CPU 空閒狀態時間
不知你是否還記得我在 面試問,建立多少個線程合適? 文章中反覆提到過,使用多線程很重要的考量點是線程切換的開銷,想象一下,若是採用非公平鎖,當一個線程請求鎖獲取同步狀態,而後釋放同步狀態,由於不須要考慮是否還有前驅節點,因此剛釋放鎖的線程在此刻再次獲取同步狀態的概率就變得很是大,因此就減小了線程的開銷
相信到這裏,你也就明白了,爲何 ReentrantLock 默認構造器用的是非公平鎖同步器
public ReentrantLock() { sync = new NonfairSync(); }
看到這裏,感受非公平鎖 perfect,非也,有得必有失
使用公平鎖會有什麼問題?
公平鎖保證了排隊的公平性,非公平鎖霸氣的忽視這個規則,因此就有可能致使排隊的長時間在排隊,也沒有機會獲取到鎖,這就是傳說中的 「飢餓」
相信到這裏,答案已經在你心中了,若是爲了更高的吞吐量,很顯然非公平鎖是比較合適的,由於節省不少線程切換時間,吞吐量天然就上去了,不然那就用公平鎖還你們一個公平
咱們還差最後一個環節,真的要挺住
到這裏,咱們還沒分析 ReentrantLock 的名字,JDK 起名這麼有講究,確定有其含義,直譯過來【可重入鎖】
爲何要支持鎖的重入?
試想,若是是一個有 synchronized 修飾的遞歸調用方法,程序第二次進入被本身阻塞了豈不是很大的笑話,因此 synchronized 是支持鎖的重入的
Lock 是新輪子,天然也要支持這個功能,其實現也很簡單,請查看公平鎖和非公平鎖對比圖,其中有一段代碼:
// 判斷當前線程是否和已佔用鎖的線程是同一個 else if (current == getExclusiveOwnerThread())
仔細看代碼, 你也許發現,我前面的一個說明是錯誤的,我要從新解釋一下
重入的線程會一直將 state + 1, 釋放鎖會 state - 1直至等於0,上面這樣寫也是想幫助你們快速的區分
本文是一個長文,說明了爲何要造 Lock 新輪子,如何標準的使用 Lock,AQS 是什麼,是如何實現鎖的,結合 ReentrantLock 反推 AQS 中的一些應用以及其獨有的一些特性
獨佔式獲取鎖就這樣介紹完了,咱們還差 AQS 共享式 xxxShared
沒有分析,結合共享式,接下來咱們來閱讀一下 Semaphore,ReentrantReadWriteLock 和 CountLatch 等
最後,也歡迎你們的留言,若有錯誤之處還請指出。個人手痠了,眼睛幹了,我去準備擼下一篇.....
下面代碼是一個轉帳程序,是否存在死鎖或者鎖的其餘問題呢?
class Account { private int balance; private final Lock lock = new ReentrantLock(); // 轉帳 void transfer(Account tar, int amt){ while (true) { if(this.lock.tryLock()) { try { if (tar.lock.tryLock()) { try { this.balance -= amt; tar.balance += amt; } finally { tar.lock.unlock(); } }//if } finally { this.lock.unlock(); } }//if }//while }//transfer }
日拱一兵 | 原創