jdk的JUC包(java.util.concurrent)提供大量Java併發工具提供使用,基本由Doug Lea編寫,不少地方值得學習和借鑑,是進階升級必經之路java
本文從JUC包中經常使用的對象鎖、併發工具的使用和功能特性入手,帶着問題,由淺到深,一步步剖析併發底層AQS抽象類具體實現node
AQS是一個抽象類,類全路徑java.util.concurrent.locks.AbstractQueuedSynchronizer,抽象隊列同步器,是基於模板模式開發的併發工具抽象類,有以下併發類基於AQS實現:算法
CAS是Conmpare And Swap(比較和交換)的縮寫,是一個原子操做指令安全
CAS機制當中使用了3個基本操做數:內存地址addr,預期舊的值oldVal,要修改的新值newVal 更新一個變量的時候,只有當變量的預期值oldVal和內存地址addr當中的實際值相同時,纔會將內存地址addr對應的值修改成newValbash
基於樂觀鎖的思路,經過CAS再不斷嘗試和比較,能夠對變量值線程安全地更新markdown
線程中斷是一種線程協做機制,用於協做其餘線程中斷任務的執行併發
當線程處於阻塞等待狀態,例如調用了wait()、join()、sleep()方法以後,調用線程的interrupt()方法以後,線程會立刻退出阻塞並收到InterruptedException;框架
當線程處於運行狀態,調用線程的interrupt()方法以後,線程並不會立刻中斷執行,須要在線程的具體任務執行邏輯中經過調用isInterrupted() 方法檢測線程中斷標誌位,而後主動響應中斷,一般是拋出InterruptedException工具
下面先介紹對象鎖、併發工具備哪些基本特性,後面再逐步展開這些特性如何實現性能
以ReentrantLock鎖爲例,主要支持如下4種方式顯式獲取鎖
ReentrantLock lock = new ReentrantLock();
// 一直阻塞等待,直到獲取成功
lock.lock();
複製代碼
ReentrantLock lock = new ReentrantLock(); // 嘗試獲取鎖,若是鎖已被其餘線程佔用,則不阻塞等待直接返回false // 返回true - 鎖是空閒的且被本線程獲取,或者已經被本線程持有 // 返回false - 獲取鎖失敗 boolean isGetLock = lock.tryLock(); 複製代碼
ReentrantLock lock = new ReentrantLock(); try { // 嘗試在指定時間內獲取鎖 // 返回true - 鎖是空閒的且被本線程獲取,或者已經被本線程持有 // 返回false - 指定時間內未獲取到鎖 lock.tryLock(10, TimeUnit.SECONDS); } catch (InterruptedException e) { // 內部調用isInterrupted() 方法檢測線程中斷標誌位,主動響應中斷 e.printStackTrace(); } 複製代碼
ReentrantLock lock = new ReentrantLock();
try {
// 響應中斷獲取鎖
// 若是調用線程的thread.interrupt()方法設置線程中斷,線程退出阻塞等待並拋出中斷異常
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
}
複製代碼
ReentrantLock lock = new ReentrantLock();
lock.lock();
// ... 各類業務操做
// 顯式釋放鎖
lock.unlock();
複製代碼
已經獲取到鎖的線程,再次請求該鎖能夠直接得到
指同一個資源容許多個線程共享,例如讀寫鎖的讀鎖容許多個線程共享,共享鎖可讓多個線程併發安全地訪問數據,提升程序執效率
公平鎖:多個線程採用先到先得的公平方式競爭鎖。每次加鎖前都會檢查等待隊列裏面有沒有線程排隊,沒有才會嘗試獲取鎖。 非公平鎖:當一個線程採用非公平的方式獲取鎖時,該線程會首先去嘗試獲取鎖而不是等待。若是沒有獲取成功,纔會進入等待隊列
由於非公平鎖方式可使後來的線程有必定概率直接獲取鎖,減小了線程掛起等待的概率,性能優於公平鎖
相似Object的wait()、wait(long timeout)、notify()以及notifyAll()的方法結合synchronized內置鎖能夠實現能夠實現等待/通知模式,實現Lock接口的ReentrantLock、ReentrantReadWriteLock等對象鎖也有相似功能:
Condition接口定義了await()、awaitNanos(long)、signal()、signalAll()等方法,配合對象鎖實例實現等待/通知功能,原理是基於AQS內部類ConditionObject實現Condition接口,線程await後阻塞並進入CLH隊列(下面提到),等待其餘線程調用signal方法後被喚醒
CLH隊列,CLH是算法提出者Craig, Landin, Hagersten的名字簡稱
AQS內部維護着一個雙向FIFO的CLH隊列,AQS依賴它來管理等待中的線程,若是線程獲取同步競爭資源失敗時,會將線程阻塞,並加入到CLH同步隊列;當競爭資源空閒時,基於CLH隊列阻塞線程並分配資源
CLH的head節點保存當前佔用資源的線程,或者是沒有線程信息,其餘節點保存排隊線程信息
CLH中每個節點的狀態(waitStatus)取值以下:
AQS定義兩種資源共享方式: Exclusive 獨佔,只有一個線程能執行,如ReentrantLock Share 共享,多個線程可同時執行,如Semaphore/CountDownLatch
AQS 基於sun.misc.Unsafe類提供的park方法阻塞線程,unpark方法喚醒線程,被park方法阻塞的線程能響應interrupt()中斷請求退出阻塞
核心設計思路:AQS提供一個框架,用於實現依賴於CLH隊列的阻塞鎖和相關的併發同步器。子類經過實現斷定是否能獲取/釋放資源的protect方法,AQS基於這些protect方法實現對線程的排隊、喚醒的線程調度策略
AQS還提供一個支持線程安全原子更新的int類型變量做爲同步狀態值(state),子類能夠根據實際需求,靈活定義該變量表明的意義進行更新
經過子類從新定義的系列protect方法以下:
這些方法始終由須要須要調度協做的線程來調用,子類須以非阻塞的方式從新定義這些方法
AQS基於上述tryXXX方法,對外提供下列方法來獲取/釋放資源:
以獨佔模式爲例:獲取/釋放資源的核心的實現以下:
Acquire: while (!tryAcquire(arg)) { 若是線程還沒有排隊,則將其加入隊列; } Release: if (tryRelease(arg)) 喚醒CLH中第一個排隊線程 複製代碼
到這裏,有點繞,下面一張圖把上面介紹到的設計思路再從新捋一捋:
下面介紹基於AQS的對象鎖、併發工具的一系列功能特性的實現原理
該特性仍是以ReentrantLock鎖爲例,ReentrantLock是可重入對象鎖,線程每次請求獲取成功一次鎖,同步狀態值state加1,釋放鎖state減1,state爲0表明沒有任何線程持有鎖
ReentrantLock鎖支持公平/非公平特性,下面的顯式獲取特性以公平鎖爲例
基本實現以下:
ReentrantLock中的tryAcquire(int)方法實現:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 沒有任何線程持有鎖 if (c == 0) { // 經過CLH隊列的head判斷沒有別的線程在比當前更早acquires // 且基於CAS設置state成功(指望的state舊值爲0) if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 設置持有鎖的線程爲當前線程 setExclusiveOwnerThread(current); return true; } } // 持有鎖的線程爲當前線程 else if (current == getExclusiveOwnerThread()) { // 僅僅在當前線程,單線程,不用基於CAS更新 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 其餘線程已經持有鎖 return false; } 複製代碼
AQS的acquire(int)方法實現
public final void acquire(int arg) { // tryAcquire檢查釋放能獲取成功 // addWaiter 構建CLH的節點對象併入隊 // acquireQueued線程阻塞等待 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // acquireQueued返回true,表明線程在獲取資源的過程當中被中斷 // 則調用該方法將線程中斷標誌位設置爲true selfInterrupt(); } final boolean acquireQueued(final Node node, int arg) { // 標記是否成功拿到資源 boolean failed = true; try { // 標記等待過程當中是否被中斷過 boolean interrupted = false; // 循環直到資源釋放 for (;;) { // 拿到前驅節點 final Node p = node.predecessor(); // 若是前驅是head,即本節點是第二個節點,纔有資格去嘗試獲取資源 // 多是head釋放完資源喚醒本節點,也可能被interrupt() if (p == head && tryAcquire(arg)) { // 成功獲取資源 setHead(node); // help GC p.next = null; failed = false; return interrupted; } // 須要排隊阻塞等待 // 若是在過程當中線程中斷,不響應中斷 // 且繼續排隊獲取資源,設置interrupted變量爲true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 複製代碼
ReentrantLock中的tryLock()的實現僅僅是非公平鎖實現,實現邏輯基本與tryAcquire一致,不一樣的是沒有經過hasQueuedPredecessors()檢查CLH隊列的head是否有其餘線程在等待,這樣當資源釋放時,有線程請求資源能插隊優先獲取
ReentrantLock中tryLock()具體實現以下:
public boolean tryLock() { return sync.nonfairTryAcquire(1); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 沒有任何線程持有鎖 if (c == 0) { // 基於CAS設置state成功(指望的state舊值爲0) // 沒有檢查CLH隊列中是否有線程在等待 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 持有鎖的線程爲當前線程 else if (current == getExclusiveOwnerThread()) { // 僅僅在當前線程,單線程,不用基於CAS更新 int nextc = c + acquires; if (nextc < 0) // overflow,整數溢出 throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 其餘線程已經持有鎖 return false; } 複製代碼
基本實現以下:
ReentrantLock中的實現以下:
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 若是線程已經被interrupt()方法設置中斷 if (Thread.interrupted()) throw new InterruptedException(); // 先tryAcquire嘗試獲取鎖 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } 複製代碼
AQS中的實現以下:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 獲取到資源的截止時間 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); // 標記是否成功拿到資源 boolean failed = true; try { for (;;) { // 拿到前驅節點 final Node p = node.predecessor(); // 若是前驅是head,即本節點是第二個節點,纔有資格去嘗試獲取資源 // 多是head釋放完資源喚醒本節點,也可能被interrupt() if (p == head && tryAcquire(arg)) { // 成功獲取資源 setHead(node); // help GC p.next = null; failed = false; return true; } // 更新剩餘超時時間 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; // 排隊是否須要排隊阻塞等待 // 且超時時間大於1微秒,則線程休眠到超時時間到了再嘗試獲取 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 若是線程已經被interrupt()方法設置中斷 // 則再也不排隊,直接退出 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 複製代碼
ReentrantLock響應中斷獲取鎖的方式是:當線程在park方法休眠中響應thead.interrupt()方法中斷喚醒時,檢查到線程中斷標誌位爲true,主動拋出異常,核心實如今AQS的doAcquireInterruptibly(int)方法中
基本實現與阻塞等待獲取相似,只是調用從AQS的acquire(int)方法,改成調用AQS的doAcquireInterruptibly(int)方法
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); // 標記是否成功拿到資源 boolean failed = true; try { for (;;) { // 拿到前驅節點 final Node p = node.predecessor(); // 若是前驅是head,即本節點是第二個節點,纔有資格去嘗試獲取資源 // 多是head釋放完資源喚醒本節點,也可能被interrupt() if (p == head && tryAcquire(arg)) { // 成功獲取資源 setHead(node); p.next = null; // help GC failed = false; return; } // 須要排隊阻塞等待 if (shouldParkAfterFailedAcquire(p, node) && // 從排隊阻塞中喚醒,若是檢查到中斷標誌位爲true parkAndCheckInterrupt()) // 主動響應中斷 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 複製代碼
AQS資源共享方式分爲獨佔式和共享式,這裏先以ReentrantLock爲例介紹獨佔式資源的顯式釋放,共享式後面會介紹到
與顯式獲取有相似之處,ReentrantLock顯式釋放基本實現以下:
ReentrantLock中tryRelease(int)方法實現以下:
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 只有持有鎖的線程纔有資格釋放鎖 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 標識是否沒有任何線程持有鎖 boolean free = false; // 沒有任何線程持有鎖 // 可重入鎖每lock一次都須要對應一次unlock if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } 複製代碼
AQS中的release(int)方法實現以下:
public final boolean release(int arg) { // 嘗試釋放資源 if (tryRelease(arg)) { Node h = head; // 頭節點不爲空 // 後繼節點入隊後進入休眠狀態以前,會將前驅節點的狀態更新爲SIGNAL(-1) // 頭節點狀態爲0,表明沒有後繼的等待節點 if (h != null && h.waitStatus != 0) // 喚醒第二個節點 // 頭節點是佔用資源的線程,第二個節點纔是首個等待資源的線程 unparkSuccessor(h); return true; } return false; } 複製代碼
可重入的實現比較簡單,以ReentrantLock爲例,主要是在tryAcquire(int)方法中實現,持有鎖的線程是否是當前線程,若是是,更新同步狀態值state,並返回true,表明能獲取鎖
可共享資源以ReentrantReadWriteLock爲例,跟獨佔鎖ReentrantLock的區別主要在於,獲取的時候,多個線程容許共享讀鎖,當寫鎖釋放時,多個阻塞等待讀鎖的線程能同時獲取到
ReentrantReadWriteLock類中將AQS的state同步狀態值定義爲,高16位爲讀鎖持有數,低16位爲寫鎖持有鎖
ReentrantReadWriteLock中tryAcquireShared(int)、tryReleaseShared(int)實現的邏輯較長,主要涉及讀寫互斥、可重入判斷、讀鎖對寫鎖的讓步,篇幅所限,這裏就不展開了
獲取讀鎖(ReadLock.lock())主要實現以下:
AQS中共享模式獲取資源的具體實現以下:
public final void acquireShared(int arg) { // tryAcquireShared返回負數表明獲取共享資源失敗 // 則經過進入等待隊列,直到獲取到資源爲止才返回 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } // 與前面介紹到的acquireQueued邏輯基本一致 // 不一樣的是將tryAcquire改成tryAcquireShared // 還有資源獲取成功後將傳播給CLH隊列上等待該資源的節點 private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); // 標記是否成功拿到資源 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); // 資源獲取成功 if (r >= 0) { // 傳播給CLH隊列上等待該資源的節點 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 須要排隊阻塞等待 // 若是在過程當中線程中斷,不響應中斷 // 且繼續排隊獲取資源,設置interrupted變量爲true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // 資源傳播給CLH隊列上等待該資源的節點 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 釋放共享資源 doReleaseShared(); } } 複製代碼
釋放讀鎖(ReadLock.unlock())主要實現以下: ReentrantReadWriteLock中共享資源的釋放主要實現以下:
AQS中共享模式釋放資源具體實現以下:
public final boolean releaseShared(int arg) { // 容許喚醒CLH中的休眠線程 if (tryReleaseShared(arg)) { // 執行資源釋放 doReleaseShared(); return true; } return false; } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 當前節點正在等待資源 if (ws == Node.SIGNAL) { // 當前節點被其餘線程喚醒了 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } // 進入else的條件是,當前節點剛剛成爲頭節點 // 尾節點剛剛加入CLH隊列,還沒在休眠前將前驅節點狀態改成SIGNAL // CAS失敗是尾節點已經在休眠前將前驅節點狀態改成SIGNAL else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // 每次喚醒後驅節點後,線程進入doAcquireShared方法,而後更新head // 若是h變量在本輪循環中沒有被改變,說明head == tail,隊列中節點所有被喚醒 if (h == head) break; } } 複製代碼
這個特性實現比較簡單,以ReentrantLock鎖爲例,公平鎖直接基於AQS的acquire(int)獲取資源,而非公平鎖先嚐試插隊:基於CAS,指望state同步變量值爲0(沒有任何線程持有鎖),更新爲1,若是所有CAS更新失敗再進行排隊
// 公平鎖實現 final void lock() { acquire(1); } // 非公平鎖實現 final void lock() { // 第1次CAS // state值爲0表明沒有任何線程持有鎖,直接插隊得到鎖 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // 在nonfairTryAcquire方法中再次CAS嘗試獲取鎖 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 第2次CAS嘗試獲取鎖 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 已經得到鎖 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 複製代碼
AQS的state變量值的含義不必定表明資源,不一樣的AQS的繼承類能夠對state變量值有不一樣的定義
例如在countDownLatch類中,state變量值表明還需釋放的latch計數(能夠理解爲須要打開的門閂數),須要每一個門閂都打開,門才能打開,全部等待線程纔會開始執行,每次countDown()就會對state變量減1,若是state變量減爲0,則喚醒CLH隊列中的休眠線程
學習相似底層源碼建議先定幾個問題,帶着問題學習;通俗學習前建議先理解透徹總體設計,總體原理(能夠先閱讀相關文檔資料),再研究和源碼細節,避免一開始就扎進去源碼,容易無功而返