java_guide_9-30_併發相關

3.1 CopyOnWriteArrayList 簡介

public class CopyOnWriteArrayList<E>
extends Object
implements List<E>, RandomAccess, Cloneable, Serializable

 

在不少應用場景中,讀操做可能會遠遠大於寫操做。因爲讀操做根本不會修改原有的數據,所以對於每次讀取都進行加鎖實際上是一種資源浪費。咱們應該容許多個線程同時訪問List的內部數據,畢竟讀取操做是安全的。java

這和咱們以前在多線程章節講過 ReentrantReadWriteLock 讀寫鎖的思想很是相似,也就是讀讀共享、寫寫互斥、讀寫互斥、寫讀互斥。JDK中提供了 CopyOnWriteArrayList 類比相比於在讀寫鎖的思想又更進一步。爲了將讀取的性能發揮到極致,CopyOnWriteArrayList 讀取是徹底不用加鎖的,而且更厲害的是:寫入也不會阻塞讀取操做。只有寫入和寫入之間須要進行同步等待。這樣一來,讀操做的性能就會大幅度提高。那它是怎麼作的呢?面試

3.2 CopyOnWriteArrayList 是如何作到的?

CopyOnWriteArrayList 類的全部可變操做(add,set等等)都是經過建立底層數組的新副原本實現的。當 List 須要被修改的時候,我並不修改原有內容,而是對原有數據進行一次複製,將修改的內容寫入副本。寫完以後,再將修改完的副本替換原來的數據,這樣就能夠保證寫操做不會影響讀操做了。算法

從 CopyOnWriteArrayList 的名字就能看出CopyOnWriteArrayList 是知足CopyOnWrite 的ArrayList,所謂CopyOnWrite 也就是說:在計算機,若是你想要對一塊內存進行修改時,咱們不在原有內存塊中進行寫操做,而是將內存拷貝一份,在新的內存中進行寫操做,寫完以後呢,就將指向原來內存指針指向新的內存,原來的內存就能夠被回收掉了。數組

六 ConcurrentSkipListMap

爲了引出ConcurrentSkipListMap,先帶着你們簡單理解一下跳錶。安全

對於一個單鏈表,即便鏈表是有序的,若是咱們想要在其中查找某個數據,也只能從頭至尾遍歷鏈表,這樣效率天然就會很低,跳錶就不同了。跳錶是一種能夠用來快速查找的數據結構,有點相似於平衡樹。它們均可以對元素進行快速的查找。但一個重要的區別是:對平衡樹的插入和刪除每每極可能致使平衡樹進行一次全局的調整。而對跳錶的插入和刪除只須要對整個數據結構的局部進行操做便可。這樣帶來的好處是:在高併發的狀況下,你會須要一個全局鎖來保證整個平衡樹的線程安全。而對於跳錶,你只須要部分鎖便可。這樣,在高併發環境下,你就能夠擁有更好的性能。而就查詢的性能而言,跳錶的時間複雜度也是 O(logn) 因此在併發數據結構中,JDK 使用跳錶來實現一個 Map。數據結構

跳錶的本質是同時維護了多個鏈表,而且鏈表是分層的,多線程

最低層的鏈表維護了跳錶內全部的元素,每上面一層鏈表都是下面一層的子集。併發

跳錶內的全部鏈表的元素都是排序的。查找時,能夠從頂級鏈表開始找。一旦發現被查找的元素大於當前鏈表中的取值,就會轉入下一層鏈表繼續找。這也就是說在查找過程當中,搜索是跳躍式的。如上圖所示,在跳錶中查找元素18。框架

查找18 的時候原來須要遍歷 18 次,如今只須要 7 次便可。針對鏈表長度比較大的時候,構建索引查找效率的提高就會很是明顯。dom

從上面很容易看出,跳錶是一種利用空間換時間的算法。

使用跳錶實現Map 和使用哈希算法實現Map的另一個不一樣之處是:哈希並不會保存元素的順序,而跳錶內全部的元素都是排序的。所以在對跳錶進行遍歷時,你會獲得一個有序的結果。因此,若是你的應用須要有序性,那麼跳錶就是你不二的選擇。JDK 中實現這一數據結構的類是ConcurrentSkipListMap。


 

1 AQS 簡單介紹

AQS的全稱爲(AbstractQueuedSynchronizer),這個類在java.util.concurrent.locks包下面。

AQS是一個用來構建鎖和同步器的框架,使用AQS能簡單且高效地構造出應用普遍的大量的同步器,好比咱們提到的ReentrantLock,Semaphore,其餘的諸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基於AQS的。固然,咱們本身也能利用AQS很是輕鬆容易地構造出符合咱們本身需求的同步器。

2.1 AQS 原理概覽

AQS核心思想是,若是被請求的共享資源空閒,則將當前請求資源的線程設置爲有效的工做線程,而且將共享資源設置爲鎖定狀態。若是被請求的共享資源被佔用,那麼就須要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。

CLH(Craig,Landin,and Hagersten)隊列是一個虛擬的雙向隊列(虛擬的雙向隊列即不存在隊列實例,僅存在結點之間的關聯關係)。AQS是將每條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node)來實現鎖的分配。

 

 

 


 

3 Semaphore(信號量)-容許多個線程同時訪問

synchronized 和 ReentrantLock 都是一次只容許一個線程訪問某個資源,Semaphore(信號量)能夠指定多個線程同時訪問某個資源。 示例代碼以下:

/**
 * 
 * @author Snailclimb
 * @date 2018年9月30日
 * @Description: 須要一次性拿一個許可的狀況
 */
public class SemaphoreExample1 {
  // 請求的數量
  private static final int threadCount = 550;

  public static void main(String[] args) throws InterruptedException {
    // 建立一個具備固定線程數量的線程池對象(若是這裏線程池的線程數量給太少的話你會發現執行的很慢)
    ExecutorService threadPool = Executors.newFixedThreadPool(300);
    // 一次只能容許執行的線程數量。
    final Semaphore semaphore = new Semaphore(20);

    for (int i = 0; i < threadCount; i++) {
      final int threadnum = i;
      threadPool.execute(() -> {// Lambda 表達式的運用
        try {
          semaphore.acquire();// 獲取一個許可,因此可運行線程數量爲20/1=20
          test(threadnum);
          semaphore.release();// 釋放一個許可
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }

      });
    }
    threadPool.shutdown();
    System.out.println("finish");
  }

  public static void test(int threadnum) throws InterruptedException {
    Thread.sleep(1000);// 模擬請求的耗時操做
    System.out.println("threadnum:" + threadnum);
    Thread.sleep(1000);// 模擬請求的耗時操做
  }
}
View Code

 

除了 acquire方法以外,另外一個比較經常使用的與之對應的方法是tryAcquire方法,該方法若是獲取不到許可就當即返回false。

4 CountDownLatch (倒計時器)

CountDownLatch是一個同步工具類,它容許一個或多個線程一直等待,直到其餘線程的操做執行完後再執行。在Java併發中,countdownlatch的概念是一個常見的面試題,因此必定要確保你很好的理解了它。

4.1 CountDownLatch 的三種典型用法

①某一線程在開始運行前等待n個線程執行完畢。將 CountDownLatch 的計數器初始化爲n :new CountDownLatch(n),每當一個任務線程執行完畢,就將計數器減1 countdownlatch.countDown(),當計數器的值變爲0時,在CountDownLatch上 await() 的線程就會被喚醒。一個典型應用場景就是啓動一個服務時,主線程須要等待多個組件加載完畢,以後再繼續執行。

②實現多個線程開始執行任務的最大並行性。注意是並行性,不是併發,強調的是多個線程在某一時刻同時開始執行。相似於賽跑,將多個線程放到起點,等待發令槍響,而後同時開跑。作法是初始化一個共享的 CountDownLatch 對象,將其計數器初始化爲 1 :new CountDownLatch(1),多個線程在開始執行任務前首先 coundownlatch.await(),當主線程調用 countDown() 時,計數器變爲0,多個線程同時被喚醒。

③死鎖檢測:一個很是方便的使用場景是,你可使用n個線程訪問共享資源,在每次測試階段的線程數目是不一樣的,並嘗試產生死鎖。

/**
 * 
 * @author SnailClimb
 * @date 2018年10月1日
 * @Description: CountDownLatch 使用方法示例
 */
public class CountDownLatchExample1 {
  // 請求的數量
  private static final int threadCount = 550;

  public static void main(String[] args) throws InterruptedException {
    // 建立一個具備固定線程數量的線程池對象(若是這裏線程池的線程數量給太少的話你會發現執行的很慢)
    ExecutorService threadPool = Executors.newFixedThreadPool(300);
    final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    for (int i = 0; i < threadCount; i++) {
      final int threadnum = i;
      threadPool.execute(() -> {// Lambda 表達式的運用
        try {
          test(threadnum);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        } finally {
          countDownLatch.countDown();// 表示一個請求已經被完成
        }

      });
    }
    countDownLatch.await();
    threadPool.shutdown();
    System.out.println("finish");
  }

  public static void test(int threadnum) throws InterruptedException {
    Thread.sleep(1000);// 模擬請求的耗時操做
    System.out.println("threadnum:" + threadnum);
    Thread.sleep(1000);// 模擬請求的耗時操做
  }
}

 

上面的代碼中,咱們定義了請求的數量爲550,當這550個請求被處理完成以後,纔會執行System.out.println("finish");

與CountDownLatch的第一次交互是主線程等待其餘線程。主線程必須在啓動其餘線程後當即調用CountDownLatch.await()方法。這樣主線程的操做就會在這個方法上阻塞,直到其餘線程完成各自的任務。

其餘N個線程必須引用閉鎖對象,由於他們須要通知CountDownLatch對象,他們已經完成了各自的任務。這種通知機制是經過 CountDownLatch.countDown()方法來完成的;每調用一次這個方法,在構造函數中初始化的count值就減1。因此當N個線程都調 用了這個方法,count的值等於0,而後主線程就能經過await()方法,恢復執行本身的任務。


 

5 CyclicBarrier(循環柵欄)

CyclicBarrier 和 CountDownLatch 很是相似,它也能夠實現線程間的技術等待,可是它的功能比 CountDownLatch 更加複雜和強大。主要應用場景和 CountDownLatch 相似。

CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續幹活。CyclicBarrier默認的構造方法是 CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴 CyclicBarrier 我已經到達了屏障,而後當前線程被阻塞。

5.1 CyclicBarrier 的應用場景

CyclicBarrier 能夠用於多線程計算數據,最後合併計算結果的應用場景。好比咱們用一個Excel保存了用戶全部銀行流水,每一個Sheet保存一個賬戶近一年的每筆銀行流水,如今須要統計用戶的日均銀行流水,先用多線程處理每一個sheet裏的銀行流水,都執行完以後,獲得每一個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。

/**
 * 
 * @author Snailclimb
 * @date 2018年10月1日
 * @Description: 測試 CyclicBarrier 類中帶參數的 await() 方法
 */
public class CyclicBarrierExample2 {
  // 請求的數量
  private static final int threadCount = 550;
  // 須要同步的線程數量
  private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

  public static void main(String[] args) throws InterruptedException {
    // 建立線程池
    ExecutorService threadPool = Executors.newFixedThreadPool(10);

    for (int i = 0; i < threadCount; i++) {
      final int threadNum = i;
      Thread.sleep(1000);
      threadPool.execute(() -> {
        try {
          test(threadNum);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        } catch (BrokenBarrierException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      });
    }
    threadPool.shutdown();
  }

  public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
    System.out.println("threadnum:" + threadnum + "is ready");
    try {
      /**等待60秒,保證子線程徹底執行結束*/  
      cyclicBarrier.await(60, TimeUnit.SECONDS);
    } catch (Exception e) {
      System.out.println("-----CyclicBarrierException------");
    }
    System.out.println("threadnum:" + threadnum + "is finish");
  }

}
View Code

 

threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......

 

能夠看到當線程數量也就是請求數量達到咱們定義的 5 個的時候, await方法以後的方法才被執行。

另外,CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。示例代碼以下:

/**
 * 
 * @author SnailClimb
 * @date 2018年10月1日
 * @Description: 新建 CyclicBarrier 的時候指定一個 Runnable
 */
public class CyclicBarrierExample3 {
  // 請求的數量
  private static final int threadCount = 550;
  // 須要同步的線程數量
  private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
    System.out.println("------當線程數達到以後,優先執行------");
  });

  public static void main(String[] args) throws InterruptedException {
    // 建立線程池
    ExecutorService threadPool = Executors.newFixedThreadPool(10);

    for (int i = 0; i < threadCount; i++) {
      final int threadNum = i;
      Thread.sleep(1000);
      threadPool.execute(() -> {
        try {
          test(threadNum);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        } catch (BrokenBarrierException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      });
    }
    threadPool.shutdown();
  }

  public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
    System.out.println("threadnum:" + threadnum + "is ready");
    cyclicBarrier.await();
    System.out.println("threadnum:" + threadnum + "is finish");
  }

}
View Code

 

threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------當線程數達到以後,優先執行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------當線程數達到以後,優先執行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......

 


 

CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一個或者多個線程,等待其餘多個線程完成某件事情以後才能執行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多個線程互相等待,直到到達同一個同步點,再繼續一塊兒執行。)

相關文章
相關標籤/搜索