public class CopyOnWriteArrayList<E> extends Object implements List<E>, RandomAccess, Cloneable, Serializable
在不少應用場景中,讀操做可能會遠遠大於寫操做。因爲讀操做根本不會修改原有的數據,所以對於每次讀取都進行加鎖實際上是一種資源浪費。咱們應該容許多個線程同時訪問List的內部數據,畢竟讀取操做是安全的。java
這和咱們以前在多線程章節講過 ReentrantReadWriteLock
讀寫鎖的思想很是相似,也就是讀讀共享、寫寫互斥、讀寫互斥、寫讀互斥。JDK中提供了 CopyOnWriteArrayList
類比相比於在讀寫鎖的思想又更進一步。爲了將讀取的性能發揮到極致,CopyOnWriteArrayList
讀取是徹底不用加鎖的,而且更厲害的是:寫入也不會阻塞讀取操做。只有寫入和寫入之間須要進行同步等待。這樣一來,讀操做的性能就會大幅度提高。那它是怎麼作的呢?面試
CopyOnWriteArrayList
類的全部可變操做(add,set等等)都是經過建立底層數組的新副原本實現的。當 List 須要被修改的時候,我並不修改原有內容,而是對原有數據進行一次複製,將修改的內容寫入副本。寫完以後,再將修改完的副本替換原來的數據,這樣就能夠保證寫操做不會影響讀操做了。算法
從 CopyOnWriteArrayList
的名字就能看出CopyOnWriteArrayList
是知足CopyOnWrite
的ArrayList,所謂CopyOnWrite
也就是說:在計算機,若是你想要對一塊內存進行修改時,咱們不在原有內存塊中進行寫操做,而是將內存拷貝一份,在新的內存中進行寫操做,寫完以後呢,就將指向原來內存指針指向新的內存,原來的內存就能夠被回收掉了。數組
爲了引出ConcurrentSkipListMap,先帶着你們簡單理解一下跳錶。安全
對於一個單鏈表,即便鏈表是有序的,若是咱們想要在其中查找某個數據,也只能從頭至尾遍歷鏈表,這樣效率天然就會很低,跳錶就不同了。跳錶是一種能夠用來快速查找的數據結構,有點相似於平衡樹。它們均可以對元素進行快速的查找。但一個重要的區別是:對平衡樹的插入和刪除每每極可能致使平衡樹進行一次全局的調整。而對跳錶的插入和刪除只須要對整個數據結構的局部進行操做便可。這樣帶來的好處是:在高併發的狀況下,你會須要一個全局鎖來保證整個平衡樹的線程安全。而對於跳錶,你只須要部分鎖便可。這樣,在高併發環境下,你就能夠擁有更好的性能。而就查詢的性能而言,跳錶的時間複雜度也是 O(logn) 因此在併發數據結構中,JDK 使用跳錶來實現一個 Map。數據結構
跳錶的本質是同時維護了多個鏈表,而且鏈表是分層的,多線程
最低層的鏈表維護了跳錶內全部的元素,每上面一層鏈表都是下面一層的子集。併發
跳錶內的全部鏈表的元素都是排序的。查找時,能夠從頂級鏈表開始找。一旦發現被查找的元素大於當前鏈表中的取值,就會轉入下一層鏈表繼續找。這也就是說在查找過程當中,搜索是跳躍式的。如上圖所示,在跳錶中查找元素18。框架
查找18 的時候原來須要遍歷 18 次,如今只須要 7 次便可。針對鏈表長度比較大的時候,構建索引查找效率的提高就會很是明顯。dom
從上面很容易看出,跳錶是一種利用空間換時間的算法。
使用跳錶實現Map 和使用哈希算法實現Map的另一個不一樣之處是:哈希並不會保存元素的順序,而跳錶內全部的元素都是排序的。所以在對跳錶進行遍歷時,你會獲得一個有序的結果。因此,若是你的應用須要有序性,那麼跳錶就是你不二的選擇。JDK 中實現這一數據結構的類是ConcurrentSkipListMap。
AQS的全稱爲(AbstractQueuedSynchronizer),這個類在java.util.concurrent.locks包下面。
AQS是一個用來構建鎖和同步器的框架,使用AQS能簡單且高效地構造出應用普遍的大量的同步器,好比咱們提到的ReentrantLock,Semaphore,其餘的諸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基於AQS的。固然,咱們本身也能利用AQS很是輕鬆容易地構造出符合咱們本身需求的同步器。
AQS核心思想是,若是被請求的共享資源空閒,則將當前請求資源的線程設置爲有效的工做線程,而且將共享資源設置爲鎖定狀態。若是被請求的共享資源被佔用,那麼就須要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。
CLH(Craig,Landin,and Hagersten)隊列是一個虛擬的雙向隊列(虛擬的雙向隊列即不存在隊列實例,僅存在結點之間的關聯關係)。AQS是將每條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node)來實現鎖的分配。
synchronized 和 ReentrantLock 都是一次只容許一個線程訪問某個資源,Semaphore(信號量)能夠指定多個線程同時訪問某個資源。 示例代碼以下:
/** * * @author Snailclimb * @date 2018年9月30日 * @Description: 須要一次性拿一個許可的狀況 */ public class SemaphoreExample1 { // 請求的數量 private static final int threadCount = 550; public static void main(String[] args) throws InterruptedException { // 建立一個具備固定線程數量的線程池對象(若是這裏線程池的線程數量給太少的話你會發現執行的很慢) ExecutorService threadPool = Executors.newFixedThreadPool(300); // 一次只能容許執行的線程數量。 final Semaphore semaphore = new Semaphore(20); for (int i = 0; i < threadCount; i++) { final int threadnum = i; threadPool.execute(() -> {// Lambda 表達式的運用 try { semaphore.acquire();// 獲取一個許可,因此可運行線程數量爲20/1=20 test(threadnum); semaphore.release();// 釋放一個許可 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); System.out.println("finish"); } public static void test(int threadnum) throws InterruptedException { Thread.sleep(1000);// 模擬請求的耗時操做 System.out.println("threadnum:" + threadnum); Thread.sleep(1000);// 模擬請求的耗時操做 } }
除了 acquire
方法以外,另外一個比較經常使用的與之對應的方法是tryAcquire
方法,該方法若是獲取不到許可就當即返回false。
CountDownLatch是一個同步工具類,它容許一個或多個線程一直等待,直到其餘線程的操做執行完後再執行。在Java併發中,countdownlatch的概念是一個常見的面試題,因此必定要確保你很好的理解了它。
①某一線程在開始運行前等待n個線程執行完畢。將 CountDownLatch 的計數器初始化爲n :new CountDownLatch(n)
,每當一個任務線程執行完畢,就將計數器減1 countdownlatch.countDown()
,當計數器的值變爲0時,在CountDownLatch上 await()
的線程就會被喚醒。一個典型應用場景就是啓動一個服務時,主線程須要等待多個組件加載完畢,以後再繼續執行。
②實現多個線程開始執行任務的最大並行性。注意是並行性,不是併發,強調的是多個線程在某一時刻同時開始執行。相似於賽跑,將多個線程放到起點,等待發令槍響,而後同時開跑。作法是初始化一個共享的 CountDownLatch
對象,將其計數器初始化爲 1 :new CountDownLatch(1)
,多個線程在開始執行任務前首先 coundownlatch.await()
,當主線程調用 countDown() 時,計數器變爲0,多個線程同時被喚醒。
③死鎖檢測:一個很是方便的使用場景是,你可使用n個線程訪問共享資源,在每次測試階段的線程數目是不一樣的,並嘗試產生死鎖。
/** * * @author SnailClimb * @date 2018年10月1日 * @Description: CountDownLatch 使用方法示例 */ public class CountDownLatchExample1 { // 請求的數量 private static final int threadCount = 550; public static void main(String[] args) throws InterruptedException { // 建立一個具備固定線程數量的線程池對象(若是這裏線程池的線程數量給太少的話你會發現執行的很慢) ExecutorService threadPool = Executors.newFixedThreadPool(300); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadnum = i; threadPool.execute(() -> {// Lambda 表達式的運用 try { test(threadnum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { countDownLatch.countDown();// 表示一個請求已經被完成 } }); } countDownLatch.await(); threadPool.shutdown(); System.out.println("finish"); } public static void test(int threadnum) throws InterruptedException { Thread.sleep(1000);// 模擬請求的耗時操做 System.out.println("threadnum:" + threadnum); Thread.sleep(1000);// 模擬請求的耗時操做 } }
上面的代碼中,咱們定義了請求的數量爲550,當這550個請求被處理完成以後,纔會執行System.out.println("finish");
。
與CountDownLatch的第一次交互是主線程等待其餘線程。主線程必須在啓動其餘線程後當即調用CountDownLatch.await()方法。這樣主線程的操做就會在這個方法上阻塞,直到其餘線程完成各自的任務。
其餘N個線程必須引用閉鎖對象,由於他們須要通知CountDownLatch對象,他們已經完成了各自的任務。這種通知機制是經過 CountDownLatch.countDown()方法來完成的;每調用一次這個方法,在構造函數中初始化的count值就減1。因此當N個線程都調 用了這個方法,count的值等於0,而後主線程就能經過await()方法,恢復執行本身的任務。
CyclicBarrier 和 CountDownLatch 很是相似,它也能夠實現線程間的技術等待,可是它的功能比 CountDownLatch 更加複雜和強大。主要應用場景和 CountDownLatch 相似。
CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續幹活。CyclicBarrier默認的構造方法是 CyclicBarrier(int parties)
,其參數表示屏障攔截的線程數量,每一個線程調用await
方法告訴 CyclicBarrier 我已經到達了屏障,而後當前線程被阻塞。
CyclicBarrier 能夠用於多線程計算數據,最後合併計算結果的應用場景。好比咱們用一個Excel保存了用戶全部銀行流水,每一個Sheet保存一個賬戶近一年的每筆銀行流水,如今須要統計用戶的日均銀行流水,先用多線程處理每一個sheet裏的銀行流水,都執行完以後,獲得每一個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。
/** * * @author Snailclimb * @date 2018年10月1日 * @Description: 測試 CyclicBarrier 類中帶參數的 await() 方法 */ public class CyclicBarrierExample2 { // 請求的數量 private static final int threadCount = 550; // 須要同步的線程數量 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) throws InterruptedException { // 建立線程池 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); threadPool.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); } public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { System.out.println("threadnum:" + threadnum + "is ready"); try { /**等待60秒,保證子線程徹底執行結束*/ cyclicBarrier.await(60, TimeUnit.SECONDS); } catch (Exception e) { System.out.println("-----CyclicBarrierException------"); } System.out.println("threadnum:" + threadnum + "is finish"); } }
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......
能夠看到當線程數量也就是請求數量達到咱們定義的 5 個的時候, await
方法以後的方法才被執行。
另外,CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction)
,用於在線程到達屏障時,優先執行barrierAction
,方便處理更復雜的業務場景。示例代碼以下:
/** * * @author SnailClimb * @date 2018年10月1日 * @Description: 新建 CyclicBarrier 的時候指定一個 Runnable */ public class CyclicBarrierExample3 { // 請求的數量 private static final int threadCount = 550; // 須要同步的線程數量 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { System.out.println("------當線程數達到以後,優先執行------"); }); public static void main(String[] args) throws InterruptedException { // 建立線程池 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); threadPool.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); } public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { System.out.println("threadnum:" + threadnum + "is ready"); cyclicBarrier.await(); System.out.println("threadnum:" + threadnum + "is finish"); } }
threadnum:0is ready threadnum:1is ready threadnum:2is ready threadnum:3is ready threadnum:4is ready ------當線程數達到以後,優先執行------ threadnum:4is finish threadnum:0is finish threadnum:2is finish threadnum:1is finish threadnum:3is finish threadnum:5is ready threadnum:6is ready threadnum:7is ready threadnum:8is ready threadnum:9is ready ------當線程數達到以後,優先執行------ threadnum:9is finish threadnum:5is finish threadnum:6is finish threadnum:8is finish threadnum:7is finish ......
CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一個或者多個線程,等待其餘多個線程完成某件事情以後才能執行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多個線程互相等待,直到到達同一個同步點,再繼續一塊兒執行。)