常見問題:AQS 原理?;CountDownLatch和CyclicBarrier瞭解嗎,二者的區別是什麼?用過Semaphore嗎?
本節思惟導圖:html
【強烈推薦!非廣告!】阿里雲雙11褥羊毛活動: https://m.aliyun.com/act/team1111/#/share?params=N.FF7yxCciiM.hf47liqn 差很少一折,不過僅限阿里雲新人購買,不是新人的朋友本身找方法買哦!
AQS的全稱爲(AbstractQueuedSynchronizer),這個類在java.util.concurrent.locks包下面。java
AQS是一個用來構建鎖和同步器的框架,使用AQS能簡單且高效地構造出應用普遍的大量的同步器,好比咱們提到的ReentrantLock,Semaphore,其餘的諸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基於AQS的。固然,咱們本身也能利用AQS很是輕鬆容易地構造出符合咱們本身需求的同步器。面試
在面試中被問到併發知識的時候,大多都會被問到「請你說一下本身對於AQS原理的理解」。下面給你們一個示例供你們參加,面試不是背題,你們必定要假如本身的思想,即便加入不了本身的思想也要保證本身可以通俗的講出來而不是背出來。
下面大部份內容其實在AQS類註釋上已經給出了,不過是英語看着比較吃力一點,感興趣的話能夠看看源碼。編程
AQS核心思想是,若是被請求的共享資源空閒,則將當前請求資源的線程設置爲有效的工做線程,而且將共享資源設置爲鎖定狀態。若是被請求的共享資源被佔用,那麼就須要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。設計模式
CLH(Craig,Landin,and Hagersten)隊列是一個虛擬的雙向隊列(虛擬的雙向隊列即不存在隊列實例,僅存在結點之間的關聯關係)。AQS是將每條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node)來實現鎖的分配。
看個AQS(AbstractQueuedSynchronizer)原理圖:安全
AQS使用一個int成員變量來表示同步狀態,經過內置的FIFO隊列來完成獲取資源線程的排隊工做。AQS使用CAS對該同步狀態進行原子操做實現對其值的修改。多線程
private volatile int state;//共享變量,使用volatile修飾保證線程可見性
狀態信息經過procted類型的getState,setState,compareAndSetState進行操做併發
//返回同步狀態的當前值 protected final int getState() { return state; } // 設置同步狀態的值 protected final void setState(int newState) { state = newState; } //原子地(CAS操做)將同步狀態值設置爲給定值update若是當前同步狀態的值等於expect(指望值) protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS定義兩種資源共享方式框架
Exclusive(獨佔):只有一個線程能執行,如ReentrantLock。又可分爲公平鎖和非公平鎖:ide
ReentrantReadWriteLock 能夠當作是組合式,由於ReentrantReadWriteLock也就是讀寫鎖容許多個線程同時對某一資源進行讀。
不一樣的自定義同步器爭用共享資源的方式也不一樣。自定義同步器在實現時只須要實現共享資源 state 的獲取與釋放方式便可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在上層已經幫咱們實現好了。
同步器的設計是基於模板方法模式的,若是須要自定義同步器通常的方式是這樣(模板方法模式很經典的一個應用):
這和咱們以往經過實現接口的方式有很大區別,這是模板方法模式很經典的一個運用,下面簡單的給你們介紹一下模板方法模式,模板方法模式是一個很容易理解的設計模式之一。
模板方法模式是基於」繼承「的,主要是爲了在不改變模板結構的前提下在子類中從新定義模板中的內容以實現複用代碼。舉個很簡單的例子假如咱們要去一個地方的步驟是:購票buyTicket()
->安檢securityCheck()
->乘坐某某工具回家ride()
->到達目的地arrive()
。咱們可能乘坐不一樣的交通工具回家好比飛機或者火車,因此除了ride()
方法,其餘方法的實現幾乎相同。咱們能夠定義一個包含了這些方法的抽象類,而後用戶根據本身的須要繼承該抽象類而後修改ride()
方法。
AQS使用了模板方法模式,自定義同步器時須要重寫下面幾個AQS提供的模板方法:
isHeldExclusively()//該線程是否正在獨佔資源。只有用到condition才須要去實現它。 tryAcquire(int)//獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。 tryRelease(int)//獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。 tryAcquireShared(int)//共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。 tryReleaseShared(int)//共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。
默認狀況下,每一個方法都拋出 UnsupportedOperationException
。 這些方法的實現必須是內部線程安全的,而且一般應該簡短而不是阻塞。AQS類中的其餘方法都是final ,因此沒法被其餘類使用,只有這幾個方法能夠被其餘類使用。
以ReentrantLock爲例,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。
再以CountDownLatch以例,任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每一個子線程執行完後countDown()一次,state會CAS(Compare and Swap)減1。等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後餘動做。
通常來講,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease
、tryAcquireShared-tryReleaseShared
中的一種便可。但AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock
。
推薦兩篇 AQS 原理和相關源碼分析的文章:
synchronized 和 ReentrantLock 都是一次只容許一個線程訪問某個資源,Semaphore(信號量)能夠指定多個線程同時訪問某個資源。示例代碼以下:
/** * * @author Snailclimb * @date 2018年9月30日 * @Description: 須要一次性拿一個許可的狀況 */ public class SemaphoreExample1 { // 請求的數量 private static final int threadCount = 550; public static void main(String[] args) throws InterruptedException { // 建立一個具備固定線程數量的線程池對象(若是這裏線程池的線程數量給太少的話你會發現執行的很慢) ExecutorService threadPool = Executors.newFixedThreadPool(300); // 一次只能容許執行的線程數量。 final Semaphore semaphore = new Semaphore(20); for (int i = 0; i < threadCount; i++) { final int threadnum = i; threadPool.execute(() -> {// Lambda 表達式的運用 try { semaphore.acquire();// 獲取一個許可,因此可運行線程數量爲20/1=20 test(threadnum); semaphore.release();// 釋放一個許可 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); System.out.println("finish"); } public static void test(int threadnum) throws InterruptedException { Thread.sleep(1000);// 模擬請求的耗時操做 System.out.println("threadnum:" + threadnum); Thread.sleep(1000);// 模擬請求的耗時操做 } }
執行 acquire
方法阻塞,直到有一個許可證能夠得到而後拿走一個許可證;每一個 release
方法增長一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並無實際的許可證這個對象,Semaphore只是維持了一個可得到許可證的數量。 Semaphore常常用於限制獲取某種資源的線程數量。
固然一次也能夠一次拿取和釋放多個許可,不過通常沒有必要這樣作:
semaphore.acquire(5);// 獲取5個許可,因此可運行線程數量爲20/5=4 test(threadnum); semaphore.release(5);// 獲取5個許可,因此可運行線程數量爲20/5=4
除了 acquire
方法以外,另外一個比較經常使用的與之對應的方法是tryAcquire
方法,該方法若是獲取不到許可就當即返回false。
Semaphore 有兩種模式,公平模式和非公平模式。
Semaphore 對應的兩個構造方法以下:
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
這兩個構造方法,都必須提供許可的數量,第二個構造方法能夠指定是公平模式仍是非公平模式,默認非公平模式。
因爲篇幅問題,若是對 Semaphore 源碼感興趣的朋友能夠看下面這篇文章:
CountDownLatch是一個同步工具類,用來協調多個線程之間的同步。這個工具一般用來控制線程等待,它可讓某一個線程等待直到倒計時結束,再開始執行。
①某一線程在開始運行前等待n個線程執行完畢。將 CountDownLatch 的計數器初始化爲n :new CountDownLatch(n)
,每當一個任務線程執行完畢,就將計數器減1 countdownlatch.countDown()
,當計數器的值變爲0時,在CountDownLatch上 await()
的線程就會被喚醒。一個典型應用場景就是啓動一個服務時,主線程須要等待多個組件加載完畢,以後再繼續執行。
②實現多個線程開始執行任務的最大並行性。注意是並行性,不是併發,強調的是多個線程在某一時刻同時開始執行。相似於賽跑,將多個線程放到起點,等待發令槍響,而後同時開跑。作法是初始化一個共享的 CountDownLatch
對象,將其計數器初始化爲 1 :new CountDownLatch(1)
,多個線程在開始執行任務前首先 coundownlatch.await()
,當主線程調用 countDown() 時,計數器變爲0,多個線程同時被喚醒。
/** * * @author SnailClimb * @date 2018年10月1日 * @Description: CountDownLatch 使用方法示例 */ public class CountDownLatchExample1 { // 請求的數量 private static final int threadCount = 550; public static void main(String[] args) throws InterruptedException { // 建立一個具備固定線程數量的線程池對象(若是這裏線程池的線程數量給太少的話你會發現執行的很慢) ExecutorService threadPool = Executors.newFixedThreadPool(300); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadnum = i; threadPool.execute(() -> {// Lambda 表達式的運用 try { test(threadnum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { countDownLatch.countDown();// 表示一個請求已經被完成 } }); } countDownLatch.await(); threadPool.shutdown(); System.out.println("finish"); } public static void test(int threadnum) throws InterruptedException { Thread.sleep(1000);// 模擬請求的耗時操做 System.out.println("threadnum:" + threadnum); Thread.sleep(1000);// 模擬請求的耗時操做 } }
上面的代碼中,咱們定義了請求的數量爲550,當這550個請求被處理完成以後,纔會執行System.out.println("finish");
。
CountDownLatch是一次性的,計數器的值只能在構造方法中初始化一次,以後沒有任何機制再次對其設置值,當CountDownLatch使用完畢後,它不能再次被使用。
CyclicBarrier 和 CountDownLatch 很是相似,它也能夠實現線程間的技術等待,可是它的功能比 CountDownLatch 更加複雜和強大。主要應用場景和 CountDownLatch 相似。
CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續幹活。CyclicBarrier默認的構造方法是 CyclicBarrier(int parties)
,其參數表示屏障攔截的線程數量,每一個線程調用await
方法告訴 CyclicBarrier 我已經到達了屏障,而後當前線程被阻塞。
CyclicBarrier 能夠用於多線程計算數據,最後合併計算結果的應用場景。好比咱們用一個Excel保存了用戶全部銀行流水,每一個Sheet保存一個賬戶近一年的每筆銀行流水,如今須要統計用戶的日均銀行流水,先用多線程處理每一個sheet裏的銀行流水,都執行完以後,獲得每一個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。
示例1:
/** * * @author Snailclimb * @date 2018年10月1日 * @Description: 測試 CyclicBarrier 類中帶參數的 await() 方法 */ public class CyclicBarrierExample2 { // 請求的數量 private static final int threadCount = 550; // 須要同步的線程數量 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) throws InterruptedException { // 建立線程池 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); threadPool.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); } public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { System.out.println("threadnum:" + threadnum + "is ready"); try { cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { System.out.println("-----CyclicBarrierException------"); } System.out.println("threadnum:" + threadnum + "is finish"); } }
運行結果,以下:
threadnum:0is ready threadnum:1is ready threadnum:2is ready threadnum:3is ready threadnum:4is ready threadnum:4is finish threadnum:0is finish threadnum:1is finish threadnum:2is finish threadnum:3is finish threadnum:5is ready threadnum:6is ready threadnum:7is ready threadnum:8is ready threadnum:9is ready threadnum:9is finish threadnum:5is finish threadnum:8is finish threadnum:7is finish threadnum:6is finish ......
能夠看到當線程數量也就是請求數量達到咱們定義的 5 個的時候, await
方法以後的方法才被執行。
另外,CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction)
,用於在線程到達屏障時,優先執行barrierAction
,方便處理更復雜的業務場景。示例代碼以下:
/** * * @author SnailClimb * @date 2018年10月1日 * @Description: 新建 CyclicBarrier 的時候指定一個 Runnable */ public class CyclicBarrierExample3 { // 請求的數量 private static final int threadCount = 550; // 須要同步的線程數量 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { System.out.println("------當線程數達到以後,優先執行------"); }); public static void main(String[] args) throws InterruptedException { // 建立線程池 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); threadPool.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); } public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { System.out.println("threadnum:" + threadnum + "is ready"); cyclicBarrier.await(); System.out.println("threadnum:" + threadnum + "is finish"); } }
運行結果,以下:
threadnum:0is ready threadnum:1is ready threadnum:2is ready threadnum:3is ready threadnum:4is ready ------當線程數達到以後,優先執行------ threadnum:4is finish threadnum:0is finish threadnum:2is finish threadnum:1is finish threadnum:3is finish threadnum:5is ready threadnum:6is ready threadnum:7is ready threadnum:8is ready threadnum:9is ready ------當線程數達到以後,優先執行------ threadnum:9is finish threadnum:5is finish threadnum:6is finish threadnum:8is finish threadnum:7is finish ......
CountDownLatch是計數器,只能使用一次,而CyclicBarrier的計數器提供reset功能,能夠屢次使用。可是我不那麼認爲它們之間的區別僅僅就是這麼簡單的一點。咱們來從jdk做者設計的目的來看,javadoc是這麼描述它們的:
CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一個或者多個線程,等待其餘多個線程完成某件事情以後才能執行;)
CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多個線程互相等待,直到到達同一個同步點,再繼續一塊兒執行。)
對於CountDownLatch來講,重點是「一個線程(多個線程)等待」,而其餘的N個線程在完成「某件事情」以後,能夠終止,也能夠等待。而對於CyclicBarrier,重點是多個線程,在任意一個線程沒有完成,全部的線程都必須等待。
CountDownLatch是計數器,線程完成一個記錄一個,只不過計數不是遞增而是遞減,而CyclicBarrier更像是一個閥門,須要全部線程都到達,閥門才能打開,而後繼續執行。
CyclicBarrier和CountDownLatch的區別這部份內容參考了以下兩篇文章:
ReentrantLock 和 synchronized 的區別在上面已經講過了這裏就很少作講解。另外,須要注意的是:讀寫鎖 ReentrantReadWriteLock 能夠保證多個線程能夠同時讀,因此在讀操做遠大於寫操做的時候,讀寫鎖就很是有用了。
因爲篇幅問題,關於 ReentrantLock 和 ReentrantReadWriteLock 詳細內容能夠查看個人這篇原創文章。