Java線程間通訊方式剖析——Java進階(四)

原創文章,同步發自做者我的博客,轉載請在文章開頭處以超連接註明出處 http://www.jasongj.com/java/thread_communication/java

CountDownLatch

CountDownLatch適用場景

Java多線程編程中常常會碰到這樣一種場景——某個線程須要等待一個或多個線程操做結束(或達到某種狀態)纔開始執行。好比開發一個關發測試工具時,主線程須要等到全部測試線程均執行完成再開始統計總共耗費的時間,此時能夠經過CountDownLatch輕鬆實現。編程

CountDownLatch實例

package com.test.thread;

import java.util.Date;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
  public static void main(String[] args) throws InterruptedException {
    int totalThread = 3;
    long start = System.currentTimeMillis();
    CountDownLatch countDown = new CountDownLatch(totalThread);
    for(int i = 0; i < totalThread; i++) {
      final String threadName = "Thread " + i;
      new Thread(() -> {
        System.out.println(String.format("%s\t%s %s", new Date(), threadName, "started"));
        try {
          Thread.sleep(1000);
        } catch (Exception ex) {
          ex.printStackTrace();
        }
        countDown.countDown();
        System.out.println(String.format("%s\t%s %s", new Date(), threadName, "ended"));
      }).start();;
    }
    countDown.await();
    long stop = System.currentTimeMillis();
    System.out.println(String.format("Total time : %sms", (stop - start)));
  }
}

執行結果安全

Sun Jun 19 20:34:31 CST 2016  Thread 1 started
Sun Jun 19 20:34:31 CST 2016  Thread 0 started
Sun Jun 19 20:34:31 CST 2016  Thread 2 started
Sun Jun 19 20:34:32 CST 2016  Thread 2 ended
Sun Jun 19 20:34:32 CST 2016  Thread 1 ended
Sun Jun 19 20:34:32 CST 2016  Thread 0 ended
Total time : 1072ms

能夠看到,主線程等待全部3個線程都執行結束後纔開始執行。多線程

CountDownLatch主要接口分析

CountDownLatch工做原理相對簡單,能夠簡單當作一個倒計時器,在構造方法中指定初始值,每次調用countDown()方法時講計數器減1,而await()會等待計數器變爲0。CountDownLatch關鍵接口以下併發

  • countDown() 若是當前計數器的值大於1,則將其減1;若當前值爲1,則將其置爲0並喚醒全部經過await等待的線程;若當前值爲0,則什麼也不作直接返回。
  • await() 等待計數器的值爲0,若計數器的值爲0則該方法返回;若等待期間該線程被中斷,則拋出InterruptedException並清除該線程的中斷狀態。
  • await(long timeout, TimeUnit unit) 在指定的時間內等待計數器的值爲0,若在指定時間內計數器的值變爲0,則該方法返回true;若指定時間內計數器的值仍未變爲0,則返回false;若指定時間內計數器的值變爲0以前當前線程被中斷,則拋出InterruptedException並清除該線程的中斷狀態。
  • getCount() 讀取當前計數器的值,通常用於調試或者測試。

CyclicBarrier

CyclicBarrier適用場景

在《當咱們說線程安全時,到底在說什麼》一文中講過內存屏障,它能保證屏障以前的代碼必定在屏障以後的代碼以前被執行。CyclicBarrier能夠譯爲循環屏障,也有相似的功能。CyclicBarrier能夠在構造時指定須要在屏障前執行await的個數,全部對await的調用都會等待,只到調用await的次數達到預約指,全部等待都會當即被喚醒。ide

從使用場景上來講,CyclicBarrier是讓多個線程互相等待某一事件的發生,而後同時被喚醒。而上文講的CountDownLatch是讓某一線程等待多個線程的狀態,而後該線程被喚醒。工具

CyclicBarrier實例

package com.test.thread;

import java.util.Date;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

  public static void main(String[] args) {
    int totalThread = 5;
    CyclicBarrier barrier = new CyclicBarrier(totalThread);
    
    for(int i = 0; i < totalThread; i++) {
      String threadName = "Thread " + i;
      new Thread(() -> {
        System.out.println(String.format("%s\t%s %s", new Date(), threadName, " is waiting"));
        try {
          barrier.await();
        } catch (Exception ex) {
          ex.printStackTrace();
        }
        System.out.println(String.format("%s\t%s %s", new Date(), threadName, "ended"));
      }).start();
    }
  }
}

執行結果以下測試

Sun Jun 19 21:04:49 CST 2016  Thread 1  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 0  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 3  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 2  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 4  is waiting
Sun Jun 19 21:04:49 CST 2016  Thread 4 ended
Sun Jun 19 21:04:49 CST 2016  Thread 0 ended
Sun Jun 19 21:04:49 CST 2016  Thread 2 ended
Sun Jun 19 21:04:49 CST 2016  Thread 1 ended
Sun Jun 19 21:04:49 CST 2016  Thread 3 ended

從執行結果能夠看到,每一個線程都不會在其它全部線程執行await()方法前繼續執行,而等全部線程都執行await()方法後全部線程的等待都被喚醒從而繼續執行。線程

CyclicBarrier主要接口分析

CyclicBarrier提供的關鍵方法以下調試

  • await() 等待其它參與方的到來(調用await())。若是當前調用是最後一個調用,則喚醒全部其它的線程的等待而且若是在構造CyclicBarrier時指定了action,當前線程會去執行該action,而後該方法返回該線程調用await的次序(getParties()-1說明該線程是第一個調用await的,0說明該線程是最後一個執行await的),接着該線程繼續執行await後的代碼;若是該調用不是最後一個調用,則阻塞等待;若是等待過程當中,當前線程被中斷,則拋出InterruptedException;若是等待過程當中,其它等待的線程被中斷,或者其它線程等待超時,或者該barrier被reset,或者當前線程在執行barrier構造時註冊的action時由於拋出異常而失敗,則拋出BrokenBarrierException
  • await(long timeout, TimeUnit unit)await()惟一的不一樣點在於設置了等待超時時間,等待超時時會拋出TimeoutException
  • reset() 該方法會將該barrier重置爲它的初始狀態,並使得全部對該barrier的await調用拋出BrokenBarrierException

Phaser

Phaser適用場景

CountDownLatch和CyclicBarrier都是JDK 1.5引入的,而Phaser是JDK 1.7引入的。Phaser的功能與r
CountDownLatch和CyclicBarrier有部分重疊,同時也提供了更豐富的語義和更靈活的用法。

Phaser顧名思義,與階段相關。Phaser比較適合這樣一種場景,一種任務能夠分爲多個階段,現但願多個線程去處理該批任務,對於每一個階段,多個線程能夠併發進行,可是但願保證只有前面一個階段的任務完成以後才能開始後面的任務。這種場景可使用多個CyclicBarrier來實現,每一個CyclicBarrier負責等待一個階段的任務所有完成。可是使用CyclicBarrier的缺點在於,須要明確知道總共有多少個階段,同時並行的任務數須要提早預約義好,且沒法動態修改。而Phaser可同時解決這兩個問題。

Phaser實例

public class PhaserDemo {

  public static void main(String[] args) throws IOException {
    int parties = 3;
    int phases = 4;
    final Phaser phaser = new Phaser(parties) {
      @Override  
      protected boolean onAdvance(int phase, int registeredParties) {  
          System.out.println("====== Phase : " + phase + " ======");  
          return registeredParties == 0;  
      }  
    };
    
    for(int i = 0; i < parties; i++) {
      int threadId = i;
      Thread thread = new Thread(() -> {
        for(int phase = 0; phase < phases; phase++) {
          System.out.println(String.format("Thread %s, phase %s", threadId, phase));
          phaser.arriveAndAwaitAdvance();
        }
      });
      thread.start();
    }
  }
}

執行結果以下

Thread 0, phase 0
Thread 1, phase 0
Thread 2, phase 0
====== Phase : 0 ======
Thread 2, phase 1
Thread 0, phase 1
Thread 1, phase 1
====== Phase : 1 ======
Thread 1, phase 2
Thread 2, phase 2
Thread 0, phase 2
====== Phase : 2 ======
Thread 0, phase 3
Thread 1, phase 3
Thread 2, phase 3
====== Phase : 3 ======

從上面的結果能夠看到,多個線程必須等到其它線程的同一階段的任務所有完成才能進行到下一個階段,而且每當完成某一階段任務時,Phaser都會執行其onAdvance方法。

Phaser主要接口分析

Phaser主要接口以下

  • arriveAndAwaitAdvance() 當前線程當前階段執行完畢,等待其它線程完成當前階段。若是當前線程是該階段最後一個未到達的,則該方法直接返回下一個階段的序號(階段序號從0開始),同時其它線程的該方法也返回下一個階段的序號。
  • arriveAndDeregister() 該方法當即返回下一階段的序號,而且其它線程須要等待的個數減一,而且把當前線程從以後須要等待的成員中移除。若是該Phaser是另一個Phaser的子Phaser(層次化Phaser會在後文中講到),而且該操做致使當前Phaser的成員數爲0,則該操做也會將當前Phaser從其父Phaser中移除。
  • arrive() 該方法不做任何等待,直接返回下一階段的序號。
  • awaitAdvance(int phase) 該方法等待某一階段執行完畢。若是當前階段不等於指定的階段或者該Phaser已經被終止,則當即返回。該階段數通常由arrive()方法或者arriveAndDeregister()方法返回。返回下一階段的序號,或者返回參數指定的值(若是該參數爲負數),或者直接返回當前階段序號(若是當前Phaser已經被終止)。
  • awaitAdvanceInterruptibly(int phase) 效果與awaitAdvance(int phase)至關,惟一的不一樣在於若該線程在該方法等待時被中斷,則該方法拋出InterruptedException
  • awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 效果與awaitAdvanceInterruptibly(int phase)至關,區別在於若是超時則拋出TimeoutException
  • bulkRegister(int parties) 註冊多個party。若是當前phaser已經被終止,則該方法無效,並返回負數。若是調用該方法時,onAdvance方法正在執行,則該方法等待其執行完畢。若是該Phaser有父Phaser則指定的party數大於0,且以前該Phaser的party數爲0,那麼該Phaser會被註冊到其父Phaser中。
  • forceTermination() 強制讓該Phaser進入終止狀態。已經註冊的party數不受影響。若是該Phaser有子Phaser,則其全部的子Phaser均進入終止狀態。若是該Phaser已經處於終止狀態,該方法調用不形成任何影響。

Java進階系列

相關文章
相關標籤/搜索