在多線程編程中咱們會遇到不少須要使用線程同步機制去解決的併發問題,而這些同步機制就是多線程編程中影響正確性和運行效率的重中之重。這不由讓我感到好奇,這些同步機制是如何實現的呢?好奇心是進步的源泉,就讓咱們一塊兒來揭開同步機制源碼的神祕面紗吧。java
在本文中,咱們會從JDK中大多數同步機制的共同基礎AbstractQueuedSynchronizer
類開始提及,而後經過源碼瞭解咱們最經常使用的兩個同步類可重入鎖ReentrantLock
和閉鎖CountDownLatch
的具體實現。經過這篇文章咱們將能夠了解到ReentrantLock
和CountDownLatch
兩個經常使用同步類的源代碼實現,而且掌握閱讀其餘基於AQS實現的同步工具類源碼的能力,甚至能夠利用AQS寫出本身的同步工具類。編程
閱讀這篇文章須要瞭解基本的線程同步機制,有興趣的讀者能夠參考一下這篇文章《多線程中那些看不到的陷阱》。安全
ReentrantLock
是咱們經常使用的一種可重入互斥鎖,是synchronized關鍵字的一個很好的替代品。互斥指的就是同一時間只能有一個線程獲取到這個鎖,而可重入是指若是一個線程再次獲取一個它已經持有的互斥鎖,那麼仍然會成功。數據結構
這個類的源碼在JDK的java.util.concurrent
包下,咱們能夠在IDE中點擊類名跳轉到具體的類定義,好比下面就是在個人電腦上跳轉以後看到的ReentrantLock類的源代碼。在這裏咱們能夠看到在ReentrantLock類中還包含了一個繼承自AbstractQueuedSynchronizer類的內部類,並且有一個該內部類Sync
類型的字段sync
。實際上ReentrantLock類就是經過這個內部類對象來實現線程同步的。多線程
若是打開CountDownLatch
的源代碼,咱們會發現這個類裏也一樣有一個繼承自AbstractQueuedSynchronizer類的子類Sync,而且也有一個Sync
類型的字段sync
。在java.util.concurrent
包下的大多數同步工具類的底層都是經過在內部定義一個AbstractQueuedSynchronizer
類的子類來實現的,包括咱們在本文中沒提到的許多其餘經常使用類也是如此,好比:讀寫鎖ReentrantReadWriteLock、信號量Semaphore等。併發
那麼這個AbstractQueuedSynchronizer
類也就是咱們所說的AQS,究竟是何方神聖呢?這個類首先像咱們上面提到的,是大多數多線程同步工具類的基礎。它內部包含了一個對同步器的等待隊列,其中包含了全部在等待獲取同步器的線程,在這個等待隊列中的線程將會在同步器釋放時被喚醒。好比一個線程在獲取互斥鎖失敗時就會被放入到等待隊列中等待被喚醒,這也就是AQS中的Q——「Queued」的由來。工具
而類名中的第一個單詞Abstract
是由於AQS是一個抽象類,它的使用方法就是實現繼承它的子類,而後使用這個子類類型的對象。在這個子類中咱們會經過重寫下列的五個方法中的一部分或者所有來指定這個同步器的行爲策略:ui
boolean tryAcquire(int arg)
,獨佔式獲取同步器,獨佔式指同一時間只能有一個線程獲取到同步器;boolean tryRelease(int arg)
,獨佔式釋放同步器;boolean isHeldExclusively()
,同步器是否被當前線程獨佔式地持有;int tryAcquireShared(int arg)
,共享式獲取同步器,共享式指的是同一時間可能有多個線程同時獲取到同步器,可是可能會有數量的限制;boolean tryReleaseShared(int arg)
,共享式釋放同步器。這五個方法之因此能指定同步器的行爲,則是由於AQS中的其餘方法就是經過對這五個方法的調用來實現的。好比在下面的acquire方法中就調用了tryAcquire來獲取同步器,而且在被調用的acquireQueued
方法內部也是經過tryAcquire
方法來循環嘗試獲取同步器的。this
public final void acquire(int arg) { // 1. 調用tryAcquire方法嘗試獲取鎖 // 2. 若是獲取失敗(tryAcquire返回false),則調用addWaiter方法將當前線程保存到等待隊列中 // 3. 以後調用acquireQueued方法來循環執行「獲取同步器 -> 獲取失敗休眠 -> 被喚醒從新獲取」過程 // 直到成功獲取到同步器返回false;或者被中斷返回true if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 若是acquireQueued方法返回true說明線程被中斷了 // 因此調用selfInterrupt方法中斷當前線程 selfInterrupt(); }
下面,咱們就來看看在ReentrantLock
和CountDownLatch
兩個類中定義的AQS子類究竟是如何重寫這五個方法的。spa
CountDownLatch
是一種典型的閉鎖,好比我須要使用四個線程完成四種不一樣的計算,而後把四個線程的計算結果相加後返回,這種狀況下主線程就須要等待四個完成不一樣任務的工做線程完成以後才能繼續執行。那麼咱們就能夠建立一個初始的count值爲4的CountDownLatch
,而後在每一個工做線程完成任務時都對這個CountDownLatch
執行一個countDown
操做,這樣CountDownLatch中的count值就會減1。當count值減到0時,主線程就會從阻塞中恢復,而後將四個任務的結果相加後返回。
下面是CountDownLath
的幾個經常使用方法:
void await()
,等待操做,若是count值目前已是0了,那麼就直接返回;不然就進入阻塞狀態,等待count值變爲0;void countDown()
,減小計數操做,會讓count減1。調用屢次countDown()
方法讓count值變爲0以後,被await()
方法阻塞的線程就能夠繼續執行了。瞭解了CountDownLatch
的基本用法以後咱們就來看看這個閉鎖究竟是怎麼實現的,首先,咱們來看一下CountDownLatch
中AQS的子類,內部類Sync
的定義。
下面的代碼是CountDownLatch
中AQS的子類Sync
的定義,Sync
是CountDownLatch
類中的一個內部類。在這個類中重寫了AQS的tryAcquireShared
和tryReleaseShared
兩個方法,這兩個都是共享模式須要重寫的方法,由於CountDownLatch
在count值爲0時能夠被任意多個線程同時獲取成功,因此應該實現共享模式的方法。
在CountDownLatch
的Sync
中使用了AQS的state值用來存放count值,在初始化時會把state值初始化爲n。而後在調用tryReleaseShared
時會將count值減1,可是由於這個方法可能會被多個線程同時調用,因此要用CAS操做保證更新操做的原子性,就像咱們用AtomicInteger
同樣。在CAS失敗時咱們須要經過重試來保證把state減1,若是CAS成功時,即便有許多線程同時執行這個操做最後的結果也必定是正確的。在這裏,tryReleaseShared
方法的返回值表示這個釋放操做是否可讓等待中的線程成功獲取同步器,因此只有在count爲0時才能返回true。
tryAcquireShared
方法就比較簡單了,直接返回state是否等於0便可,由於只有在CountDownLatch
中的count值爲0時全部但願獲取同步器的線程才能獲取成功並繼續執行。若是count不爲0,那麼線程就須要進入阻塞狀態,等到count值變爲0才能繼續執行。
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // 構造器,初始化count值 // 在這個子類中把count值保存到了AQS的state中 Sync(int count) { setState(count); } // 獲取當前的count值 int getCount() { return getState(); } // 獲取操做在state爲0時會成功,不然失敗 // tryAcquireShared失敗時,線程會進入阻塞狀態等待獲取成功 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 對閉鎖執行釋放操做減少計數值 protected boolean tryReleaseShared(int releases) { // 減少coun值,在count值歸零時喚醒等待的線程 for (;;) { int c = getState(); // 若是計數已經歸零,則直接釋放失敗 if (c == 0) return false; // 將計數值減1 int nextc = c-1; // 爲了線程安全,以CAS循環嘗試更新 if (compareAndSetState(c, nextc)) return nextc == 0; } } }
看了CountDownLatch
中的Sync
內部類定義以後,咱們再來看看CountDownLatch
是如何使用這個內部類的。
在CountDownLatch
的構造器中,初始化CountDownLatch
對象時會同時在其內部初始化保存一個Sync
類型的對象到sync
字段用於以後的同步操做。而且傳入Sync
類構造器的count必定會大於等於0。
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
有了Sync類型的對象以後,咱們在await()
方法裏就能夠直接調用sync
的acquireSharedInterruptibly
方法來獲取同步器並陷入阻塞,等待count值變爲0了。在AQS的acquireSharedInterruptibly
方法中會在調用咱們重寫的tryAcquireShared
方法獲取失敗時進入阻塞狀態,直到CountDownLatch
的count值變爲0時才能成功獲取到同步器。
public void await() throws InterruptedException { // 調用sync對象的獲取方法來進入鎖等待 sync.acquireSharedInterruptibly(1); }
而在CountDownLatch
的另外一個減小count值的重要方法countDown()
中,咱們一樣是經過調用sync
上的方法來實現具體的同步功能。在這裏,AQS的releaseShared(1)
方法中一樣會調用咱們在Sync
類中重寫的tryReleaseShared
方法來執行釋放操做,並在tryReleaseShared
方法返回true時去喚醒等待隊列中的阻塞等待線程,讓它們在count值爲0時可以繼續執行。
public void countDown() { sync.releaseShared(1); }
從上文中能夠看出,CoundDownLatch
中的各類功能都是經過內部類Sync
來實現的,而這個Sync
類就是一個繼承自AQS的子類。經過在內部類Sync
中重寫了AQS的tryAcquireShared
和tryReleaseShared
兩個方法,咱們就指定了AQS的行爲策略,使其可以符合咱們對CountDownLatch
功能的指望。這就是AQS的使用方法,下面咱們來看一個你們可能會更熟悉的例子,來進一步瞭解AQS在獨佔模式下的用法。
可重入鎖ReentrantLock
能夠說是咱們的老朋友了,從最先的synchronized
關鍵字開始,咱們就開始使用相似的功能了。可重入鎖的特色主要有兩點:
同一時間只能有一個線程持有
同一線程能夠對一個鎖重複獲取成功屢次
ReentrantLock
類的經常使用操做主要有三種:
獲取鎖,一個線程一旦獲取鎖成功後就會阻塞其餘線程獲取同一個鎖的操做,因此一旦獲取失敗,那麼當前線程就會被阻塞
public void lock()
方法釋放鎖,獲取鎖以後就要在使用完以後釋放它,不然別的線程都將會因沒法獲取鎖而被阻塞,因此咱們通常會在finally中進行鎖的釋放操做
ReentrantLock
對象的unlock
方法來釋放鎖獲取條件變量,條件變量是和互斥鎖搭配使用的一種很是有用的數據結構,有興趣的讀者能夠經過《從0到1實現本身的阻塞隊列(上)》這篇文章來了解條件變量具體的使用方法
Condition newCondition()
方法來獲取條件變量對象,而後調用條件變量對象上的await()
、signal()
、signalAll()
方法來進行使用在ReentrantLock
類中存在兩種AQS的子類,一個實現了非公平鎖,一個實現了公平鎖。所謂的「公平」指的就是獲取互斥鎖成功返回的時間會和獲取鎖操做發起的時間順序一致,例若有線程A已經持有了互斥鎖,當線程B、C、D按字母順序獲取鎖並進入等待,線程A釋放鎖後必定是線程B被喚醒,線程B釋放鎖後必定是C先被喚醒。也就是說鎖被釋放後對等待線程的喚醒順序和獲取鎖操做的順序一致。並且若是在這個過程當中,有其餘線程發起了獲取鎖操做,由於等待隊列中已經有線程在等待了,那麼這個線程必定要排到等待隊列最後去,而不能直接搶佔剛剛被釋放還未被剛剛被喚醒的線程鎖持有的鎖。
下面咱們一樣先看一下ReentrantLock
類中定義的AQS子類Sync
的具體源代碼。下面是上一段說到的非公平Sync類和公平Sync類兩個類的共同父類Sync
的帶註釋源代碼,裏面包含了大部分核心功能的實現。雖然下面包含了該類完整的源代碼,可是咱們如今只須要關心三個核心操做,也是咱們在獨佔模式下須要重寫的三個AQS方法:tryAcquire
、tryRelease
和isHeldExclusively
。建議在看完文章以後再回來回顧該類中其餘的方法實現,直接跳過其餘的方法固然也是徹底沒有問題的。
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 實現Lock接口的lock方法,子類化的主要緣由是爲了非公平版本的快速實現 */ abstract void lock(); /** * 執行非公平的tryLock。tryAcquire方法在子類中被實現,可是二者都須要非公平版本的trylock方法實現。 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 若是鎖還未被持有 if (c == 0) { // 經過CAS嘗試獲取鎖 if (compareAndSetState(0, acquires)) { // 若是鎖獲取成功則將鎖持有者改成當前線程,並返回true setExclusiveOwnerThread(current); return true; } } // 鎖已經被持有,則判斷鎖的持有者是不是當前線程 else if (current == getExclusiveOwnerThread()) { // 可重入鎖,若是鎖的持有者是當前線程,那就在state上加上新的獲取數 int nextc = c + acquires; // 判斷新的state值有沒有溢出 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 將新的state更新爲新的值,由於能夠進入這段代碼的只有一個線程 // 因此不須要線程安全措施 setState(nextc); return true; } return false; } // 重寫了AQS的獨佔式釋放鎖方法 protected final boolean tryRelease(int releases) { // 計算剩餘的鎖持有量 // 由於只有當前線程持有該鎖的狀況下才能執行這個方法,因此不須要作多線程保護 int c = getState() - releases; // 若是當前線程未持有鎖,則直接拋出錯誤 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 若是鎖持有數已經減小到0,則釋放該鎖,並清空鎖持有者 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 更新state值,只有state值被設置爲0纔是真正地釋放了鎖 // 因此setState和setExclusiveOwnerThread之間不須要額外的同步措施 setState(c); return free; } // 當前線程是否持有該鎖 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } // 建立對應的條件變量 final ConditionObject newCondition() { return new ConditionObject(); } // 從外層傳遞進來的方法 // 獲取當前的鎖持有者 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } // 獲取鎖的持有計數 // 若是當前線程持有了該鎖則返回state值,不然返回0 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } // 判斷鎖是否已經被持有 final boolean isLocked() { return getState() != 0; } }
實際的tryAcquire
方法將在公平Sync類與非公平Sync類兩個子類中實現,可是這兩個子類都須要調用父類Sync
中的非公平版本的tryAcquire——nonfairTryAcquire
方法。在這個方法中,咱們主要作兩件事:
當前鎖還未被人持有。在ReentrantLock
中使用AQS的state來保存鎖的狀態,state等於0時表明鎖沒有被任何線程持有,若是state大於0,那麼就表明持有者對該鎖的重複獲取次數
compareAndSetState
來原子性地修改state值,修改爲功則須要設置當前線程爲鎖的持有線程並返回true表明獲取成功;不然就返回鎖已被當前線程持有
而對於獨佔式釋放同步器的tryRelease
方法,則在父類Sync
中直接實現了,兩個公平/非公平子類調用的都是同一段代碼。首先,只有鎖的持有者才能釋放鎖,因此若是當前線程不是全部者線程在釋放操做中就會拋出異常。若是釋放操做會將持有計數清零,那麼當前線程就再也不是該鎖的持有者了,鎖會被徹底釋放,而鎖的全部者會被設置爲null。最後,Sync
會將減掉入參中的釋放數以後的新持有計數更新到AQS的state中,並返回鎖是否已經被徹底釋放了。
isHeldExclusively
方法比較簡單,它只是檢查鎖的持有者是不是當前線程。
Sync
的兩個公平/非公平子類的實現比較簡單,下面是非公平版本子類的源代碼。在非公平版本的實現中,調用lock
方法首先會嘗試經過CAS修改AQS的state值來直接搶佔鎖,若是搶佔成功就直接將持有者設置爲當前線程;若是搶佔失敗就調用acquire
方法走正常流程來獲取鎖。而在acquire
方法中就會調用子類中的tryAcquire
方法並進一步調用到上文提到的父類中的nonfairTryAcquire
方法來完成鎖獲取操做。
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 執行鎖操做。嘗試直接搶佔,若是失敗的話就回到正常的獲取流程進行 */ final void lock() { // 嘗試直接搶佔 if (compareAndSetState(0, 1)) // 搶佔成功設置鎖全部者 setExclusiveOwnerThread(Thread.currentThread()); else // 搶佔失敗走正常獲取流程 acquire(1); } // 實現AQS方法,使用nonfairTryAcquire實現 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
而在公平版本的Sync子類FairSync
中,爲了保證成功獲取到鎖的順序必定要和發起獲取鎖操做的順序一致,因此天然不能在lock
方法中進行CAS方式的搶佔,只能老老實實調用acquire
方法走正式流程。而acquire
方法最終就會調用子類中定義的tryAcquire
來真正獲取鎖。
在tryAcquire
方法中,代碼主要處理了兩種狀況:
當前鎖尚未被線程鎖持有
acquire
方法中的代碼會將當前線程放入到等待隊列中阻塞等待鎖的釋放。這就保證了在獲取鎖時已經有線程等待的狀況下,任何線程都要進入等待隊列去等待獲取鎖,而不能直接對鎖進行獲取。當前線程已經持有了該鎖
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 直接使用acquire進行獲取鎖操做 final void lock() { acquire(1); } /** * 公平版本的tryAcquire方法。不要授予訪問權限,除非是遞歸調用或者沒有等待線程或者這是第一個調用 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 若是鎖沒有被持有 if (c == 0) { // 爲了實現公平特性,因此只有在等待隊列爲空的狀況下才能直接搶佔 // 不然只能進入隊列等待 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 若是鎖已被持有,且當前線程就是持有線程 else if (current == getExclusiveOwnerThread()) { // 計算新的state值 int nextc = c + acquires; // 若是鎖計數溢出,則拋出異常 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 設置state狀態值 setState(nextc); return true; } return false; } }
最後,咱們來看看ReentrantLock類中的lock()
、unlock()
、newCondition
方法對Sync類對象的使用方式。
首先是在構造器中,根據入參指定的公平/非公平模式建立不一樣的內部Sync
類對象,若是是公平模式就是用FairSync
類,若是是非公平模式就是用NonfairSync
類。
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
而後在互斥鎖的鎖定方法lock()
中,ReentrantLock
直接使用Sync類中的lock
方法來實現了鎖的獲取功能。
public void lock() { // 調用sync對象的lock方法實現 sync.lock(); }
在unlock()
方法中也是同樣的狀況,ReentrantLock
直接依賴Sync類對象來實現這個功能。
public void unlock() { // 調用了sync對象的release方法實現 sync.release(1); }
最後一個建立條件變量的方法則直接依賴於AQS中定義的方法,咱們在ReentranctLock
的Sync
類中並不須要作任務額外的工做,AQS就能爲咱們作好全部的事情。
public Condition newCondition() { // 調用了sync對象繼承自AQS的`newCondition`方法實現 return sync.newCondition(); }
經過ReentrantLock
的例子咱們可以更明顯地感覺到,這些基於AQS實現同步功能的類中並不須要作太多額外的工做,大多數操做都是經過直接調用Sync
類對象上的方法來實現的。只要定義好了繼承自AQS的子類Sync
,並經過Sync
類重寫幾個AQS的關鍵方法來指定AQS的行爲策略,就能夠實現風格迥異的各類同步工具類了。
在這篇文章中,咱們從AQS的基本概念提及,簡單介紹了AQS的具體用法,而後經過CountDownLatch
和ReentrantLock
兩個經常使用的多線程同步工具類的源碼來具體瞭解了AQS的使用方式。咱們不只能夠徹底弄明白這兩個線程同步類的實現原理與細節,並且最重要的是找到了AQS這個幕後大BOSS。經過AQS,咱們不只能夠更容易地閱讀並理解其餘同步工具類的使用與實現,並且甚至能夠動手開發出咱們本身的自定義同步工具類。
到了這裏,這一系列多線程編程相關的技術文章就接近尾聲了。後續我還會發布一篇囊括這個系列全部內容的總結性文章,裏面會對多線程編程相關的知識脈絡作一次全面的梳理,而後將每一個知識點連接到具體闡釋這個主題的文章中去。讓讀者能夠在宏觀和微觀兩個層面理解多線程編程的原理與技巧,幫助你們創建完整的Java多線程理論與實踐知識體系。有興趣的讀者能夠關注一下後續的文章,感謝你們的支持。