Java併發編程的4個同步輔助類(CountDownLatch、CyclicBarrier、Semaphore、Phaser)

  我在《JDK1.5引入的concurrent包》中,曾經介紹過CountDownLatch、CyclicBarrier兩個類,還給出了CountDownLatch的演示案例。這裏再系統總結下Java併發編程中的4個類CountDownLatch、CyclicBarrier、Semaphore、Phaser。html

  1.CountDownLatchjava

  CountDownLatch能夠理解爲一個計數器在初始化時設置初始值,當一個線程須要等待某些操做先完成時,須要調用await()方法。這個方法讓線程進入休眠狀態直到等待的全部線程都執行完成。每調用一次countDown()方法,內部計數器減1,直到計數器爲0時喚醒。這個能夠理解爲特殊的CyclicBarrier。react

核心方法兩個:countDown()和await()
countDown():使CountDownLatch維護的內部計數器減1,每一個被等待的線程完成的時候調用
await():線程在執行到CountDownLatch的時候會將此線程置於休眠

   案例場景:視頻會議室裏等與會人員到齊了會議才能開始。編程

package com.itszt.test3;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
 * 視頻會議室裏等與會人員到齊了會議才能開始
 */
public class CountDownLatchTest {
    private static int num=10;//與會人員數量
    public static void main(String[] args) {
        VideoConference conference = new VideoConference(num);
        Thread threadConference = new Thread(conference);
        threadConference.start();//開啓await()方法,在內部計數器爲0以前線程處於等待狀態
        for (int i = 0; i < num; i++) {
            Participant p = new Participant(conference, "Participant " + i);
            Thread t = new Thread(p);
            t.start();
        }
    }
}

//視頻會議類
class VideoConference implements Runnable {
    private final CountDownLatch controller;

    public VideoConference(int number) {
        //計數器內等待其餘線程的初始化數目
        controller = new CountDownLatch(number);
    }

    public void arrive(String name) {
        System.out.printf("%s has arrived.\n", name);
        controller.countDown();//調用countDown()方法,使內部計數器減1
        System.out.printf("VideoConference: Waiting for %d participants.\n", controller.getCount());
    }

    @Override
    public void run() {
        synchronized (VideoConference.class){
            if(controller.getCount()!=0){
                System.out.printf("VideoConference: Initialization: %d participants.\n", controller.getCount());
            }
        }
        try {
            controller.await();//等待,直到CoutDownLatch計數器爲0

            System.out.printf("VideoConference: All the participants have come\n");
            System.out.printf("VideoConference: Let's start...\n");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

//參加會議的人員類
class Participant implements Runnable {

    private VideoConference conference;
    private String name;

    public Participant(VideoConference conference, String name) {
        this.conference = conference;
        this.name = name;
    }

    @Override
    public void run() {
        Long duration = (long) (Math.random() * 10);
        try {
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        conference.arrive(name);//每到一我的員,CountDownLatch計數器就減小1
    }
}

   代碼執行結果以下:多線程

VideoConference: Initialization: 10 participants.
Participant 3 has arrived.
Participant 7 has arrived.
VideoConference: Waiting for 9 participants.
VideoConference: Waiting for 8 participants.
Participant 4 has arrived.
VideoConference: Waiting for 7 participants.
Participant 9 has arrived.
VideoConference: Waiting for 6 participants.
Participant 2 has arrived.
Participant 1 has arrived.
VideoConference: Waiting for 5 participants.
VideoConference: Waiting for 4 participants.
Participant 5 has arrived.
Participant 8 has arrived.
VideoConference: Waiting for 3 participants.
VideoConference: Waiting for 2 participants.
Participant 0 has arrived.
VideoConference: Waiting for 1 participants.
Participant 6 has arrived.
VideoConference: Waiting for 0 participants.
VideoConference: All the participants have come
VideoConference: Let's start...

   須要注意的是,CountDownLatch是一個線程計數器。等計數器爲0時,那些先前因調用await()方法休眠的線程被喚醒。CountDownLatch可以控制的線程是哪些呢?是那些調用了CountDownLatch的await()方法的線程。案例中,先運行await()方法的線程是視頻會議的線程,而後執行與會者 線程,這裏的處理是每到一位(每建立一個線程並運行run()方法時就使計數器減1)就讓計數器減1,等計數器減爲0時喚醒因調用await()方法進入休眠的線程。這裏的與會者線程就是視頻會議線程要等待的線程。併發

  2.CyclicBarrierapp

  CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續幹活。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。
  當一個線程到達集合點時,它將調用await()方法等待其它的線程。線程調用await()方法後,CyclicBarrier將阻塞這個線程,並將它置入休眠狀態等待其它線程的到來。等最後一個線程調用await()方法時,CyclicBarrier將喚醒全部等待的線程,而後這些線程將繼續執行。CyclicBarrier能夠傳入另外一個Runnable對象做爲初始化參數。當全部的線程都到達集合點後,CyclicBarrier類將Runnable對象做爲線程執行。框架

方法
await():使線程置入休眠直到最後一個線程的到來以後喚醒全部休眠的線程

CyclicBarrier類有兩個經常使用的構造方法:
(1)CyclicBarrier(int parties)
這裏的parties也是一個計數器,例如,初始化時parties裏的計數是3,因而擁有該CyclicBarrier對象的線程當parties的計數爲3時就喚醒,注意:這裏parties裏的計數在運行時當調用CyclicBarrier:await()時,計數就加1,一直加到初始的值。
(2)CyclicBarrier(int parties, Runnable barrierAction)
這裏的parties與上一個構造方法的解釋是同樣的,這裏須要解釋的是第二個入參(Runnable barrierAction),這個參數是一個實現Runnable接口的類的對象,也就是說當parties加到初始值時就觸發barrierAction的內容。

   案例場景:有4個遊戲玩家玩遊戲,遊戲有三個關卡,每一個關卡必需要全部玩家都到達後才能容許經過。其實這個場景裏的玩家中若是有玩家A先到了關卡1,他必須等到其餘全部玩家都到達關卡1時才能經過,也就是說線程之間須要相互等待。這和CountDownLatch的應用場景有區別,CountDownLatch裏的線程是到了運行的目標後繼續幹本身的其餘事情,而這裏的線程須要等待其餘線程後才能繼續完成後面的工做。   dom

package com.itszt.test3;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
 * 測試CyclicBarrier
 */
public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4,
                new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("全部玩家進入第 2 關!");
                    }
                });
        for (int i = 1; i <= 4; i++) {
            new Thread(new Player(i, cyclicBarrier)).start();
        }
    }
}

/**
 * 玩家類
 *
 * @author itmyhome
 */
class Player implements Runnable {
    private CyclicBarrier cyclicBarrier;
    private int id;

    public Player(int id, CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
        this.id = id;
    }

    @Override
    public void run() {
        try {
            System.out.println("玩家" + id + "正在玩第 1 關...");
            cyclicBarrier.await();
            System.out.println("玩家" + id + "進入第 2 關...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

   代碼執行結果以下:jsp

玩家1正在玩第 1 關...
玩家3正在玩第 1 關...
玩家2正在玩第 1 關...
玩家4正在玩第 1 關...
全部玩家進入第 2 關!
玩家4進入第 2 關...
玩家1進入第 2 關...
玩家3進入第 2 關...
玩家2進入第 2 關...

   3.Semaphore
  信號量就是能夠聲明多把鎖(包括一把鎖,此時爲互斥信號量)。
  舉個例子:一個房間若是隻能容納5我的,多出來的人必須在門外面等着。如何去作呢?一個解決辦法就是:房間外面掛着五把鑰匙,每進去一我的就取走一把鑰匙,沒有鑰匙的不能進入該房間,而是在外面等待。每出來一我的就把鑰匙放回原處以方便別人再次進入。   

經常使用方法
acquire():獲取信號量,信號量內部計數器減1
release():釋放信號量,信號量內部計數器加1
tryAcquire():這個方法試圖獲取信號量,若是可以獲取返回true,不然返回false
信號量控制的線程數量在聲明時肯定。例如:
	Semaphore s = new Semaphore(2);

   能夠說,Semaphore是一種在多線程環境下使用的設施,該設施負責協調各個線程,以保證它們可以正確、合理的使用公共資源的設施,也是操做系統中用於控制進程同步互斥的量。Semaphore是一種計數信號量,用於管理一組資源,內部是基於AQS的共享模式。它至關於給線程規定一個量從而控制容許活動的線程數。

AQS(AbstractQueuedSynchronizer,抽象的隊列式同步器)是 java.util.concurrent的基礎。
Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock、FutureTask等雖然各自都有不一樣特徵,
可是簡單看一下源碼,每一個類內部都包含一個以下的內部類定義: abstract static class Sync extends AbstractQueuedSynchronizer;

   全部java.util.concurrent包中的同步器類都聲明瞭一個私有的繼承了AbstractQueuedSynchronizer的內部類,而且把全部同步方法都委託給這個內部類。這樣各個同步器類的公開方法就可使用適合本身的名稱。子類只需定義狀態的檢查與更新相關的方法,這些方法控制着acquire和 release操做。

  AQS維護了一個volatile int state(表明共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。這裏volatile是核心關鍵詞。state的訪問方式有三種:

  • getState()
  • setState()
  • compareAndSetState()

  AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。

  不一樣的自定義同步器爭用共享資源的方式也不一樣。自定義同步器在實現時只須要實現共享資源state的獲取與釋放方式便可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現如下幾種方法:  

  • isHeldExclusively():該線程是否正在獨佔資源。只有用到condition才須要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。

  以ReentrantLock爲例,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。

  再以CountDownLatch以例,任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每一個子線程執行完後countDown()一次,state會CAS減1。等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後餘動做。

  通常來講,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種便可。但AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。

  同步器背後的基本思想很是簡單。acquire操做以下:

while (synchronization state does not allow acquire) {
	    enqueue current thread if not already queued;
	    possibly block current thread;
	}
dequeue current thread if it was queued;

   release操做以下:  

update synchronization state;
	if (state may permit a blocked thread to acquire)
	    unblock one or more queued threads;

   爲了實現上述操做,須要下面三個基本組件的相互協做:

  • 同步狀態的原子性管理;
  • 線程的阻塞與解除阻塞;
  • 隊列的管理;

  同步器框架的核心決策是爲這三個組件選擇一個具體實現,同時在使用方式上又有大量選項可用。這裏有意地限制了其適用範圍,可是提供了足夠的效率,使得實際上沒有理由在合適的狀況下不用這個框架而去從新建造一個。

  到此,咱們再繼續看Semaphore同步器。爲了簡單起見,咱們以一個停車場的運做爲例。假設停車場只有三個車位,一開始三個車位都是空的。這時,若是同時來了五輛車,看門人容許其中三輛不受阻礙地進入,而後放下車攔,剩下的車則必須在停車場外的入口處等待,此後來的車也都不得不在入口處等待。這時,有一輛車離開停車場,看門人得知後,升起車攔,放入一輛,若是又離開兩輛,則又能夠放入兩輛,如此往復。在這個場景中,每輛車就比如一個線程,看門人就比如一個信號量,看門人限制了能夠活動的線程。假如裏面依然是三個車位,可是看門人改變了規則,要求每次只能停兩輛車,那麼停車場在進入兩輛車後,其後的車輛就要等到有車離開才能獲准許進入。對於Semaphore類而言,就如同一個看門人,限制了可活動的線程數。

  Semaphore的主要方法有:  

Semaphore(int permits):構造方法,建立具備給定許可數的計數信號量並設置爲非公平信號量。
Semaphore(int permits,boolean fair):構造方法,當fair等於true時,建立具備給定許可數的計數信號量並設置爲公平信號量。
void acquire():從該信號量獲取一個許可前,線程將一直阻塞。至關於一輛車佔了一個車位。
void acquire(int n):從該信號量獲取給定數目許可,在提供這些許可前,一直將線程阻塞。好比n=2,就至關於一輛車佔了兩個車位。
void release():釋放一個許可,將其返回給信號量。就如同車開走返回一個車位。
void release(int n):釋放n個許可。
int availablePermits():當前可用的許可數。

   接下來寫一個案例,有7我的,各自獲取信號量的許可後,再釋放許可。

package com.itszt.test3;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * 測試Semaphore
 */
public class SemaphoreTest {
    private static final Semaphore semaphore = new Semaphore(3);//默認爲非公平信號量
    private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>());
    //信號量控制的線程
    private static class InformationThread extends Thread {
        private final String name;
        private final int age;

        public InformationThread(String name, int age) {
            this.name = name;
            this.age = age;
        }

        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + ":你們好,我是" + name + "我今年" + age + "歲當前時間爲:" + System.currentTimeMillis());
                Thread.sleep(1000);
                System.out.println(name + "要準備釋放許可證了,當前時間爲:" + System.currentTimeMillis());
                System.out.println("當前可以使用的許可數爲:" + semaphore.availablePermits());
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        String[] name = {"李明", "王五", "張傑", "王強", "趙二", "李四", "張三"};
        int[] age = {26, 27, 33, 45, 19, 23, 41};
        for (int i = 0; i < 7; i++) {
            Thread t1 = new InformationThread(name[i], age[i]);
            threadPool.execute(t1);
        }
    }
}

   上述代碼執行結果以下:

pool-1-thread-1:你們好,我是李明我今年26歲當前時間爲:1524367640560
pool-1-thread-3:你們好,我是張傑我今年33歲當前時間爲:1524367640560
pool-1-thread-2:你們好,我是王五我今年27歲當前時間爲:1524367640560
李明要準備釋放許可證了,當前時間爲:1524367641560
王五要準備釋放許可證了,當前時間爲:1524367641560
張傑要準備釋放許可證了,當前時間爲:1524367641560
當前可以使用的許可數爲:0
當前可以使用的許可數爲:0
當前可以使用的許可數爲:0
pool-1-thread-4:你們好,我是王強我今年45歲當前時間爲:1524367641560
pool-1-thread-5:你們好,我是趙二我今年19歲當前時間爲:1524367641560
pool-1-thread-2:你們好,我是李四我今年23歲當前時間爲:1524367641560
李四要準備釋放許可證了,當前時間爲:1524367642563
趙二要準備釋放許可證了,當前時間爲:1524367642563
王強要準備釋放許可證了,當前時間爲:1524367642563
當前可以使用的許可數爲:0
當前可以使用的許可數爲:0
pool-1-thread-3:你們好,我是張三我今年41歲當前時間爲:1524367642563
當前可以使用的許可數爲:0
張三要準備釋放許可證了,當前時間爲:1524367643563
當前可以使用的許可數爲:2

   咱們上面用的是非公平信號量,改成公平信號量:

private static final Semaphore semaphore = new Semaphore(3,true);

   這時運行結果以下:

pool-1-thread-2:你們好,我是王五我今年27歲當前時間爲:1524367824968
pool-1-thread-3:你們好,我是張傑我今年33歲當前時間爲:1524367824968
pool-1-thread-1:你們好,我是李明我今年26歲當前時間爲:1524367824968
李明要準備釋放許可證了,當前時間爲:1524367825968
王五要準備釋放許可證了,當前時間爲:1524367825968
張傑要準備釋放許可證了,當前時間爲:1524367825968
當前可以使用的許可數爲:0
當前可以使用的許可數爲:0
當前可以使用的許可數爲:0
pool-1-thread-5:你們好,我是趙二我今年19歲當前時間爲:1524367825968
pool-1-thread-4:你們好,我是王強我今年45歲當前時間爲:1524367825968
pool-1-thread-3:你們好,我是李四我今年23歲當前時間爲:1524367825968
王強要準備釋放許可證了,當前時間爲:1524367826968
李四要準備釋放許可證了,當前時間爲:1524367826968
趙二要準備釋放許可證了,當前時間爲:1524367826968
當前可以使用的許可數爲:0
當前可以使用的許可數爲:0
pool-1-thread-1:你們好,我是張三我今年41歲當前時間爲:1524367826968
當前可以使用的許可數爲:0
張三要準備釋放許可證了,當前時間爲:1524367827968
當前可以使用的許可數爲:2

   Semaphore信號量的實現和ReetrantLock相似,都是經過內部類Sync,Sync是一個繼承於AQS的抽象類;    Semaphore信號量和ReentrantLock互斥鎖的實現區別在於,ReentrantLock互斥鎖的state若是爲0則表示鎖未被佔用,若是爲0以外的數值表示鎖被重入的次數;Semaphore信號量的state表示許可的數目;    Sync包括兩個子類:公平信號量FairSync和非公平信號量NonfailrSync,默認是非公平信號量NonfairSync。其中,公平信號量是指若是線程不在同步隊列頭部則排隊等候;非公平信號量是指不管當前線程是否在同步隊列頭部,都會嘗試獲取信號量。

  信號量若是要實現單例模式,能夠這樣修改:

    private static final Semaphore semaphore=new Semaphore(1);  

   再執行代碼,結果則以下:

pool-1-thread-1:你們好,我是李明我今年26歲當前時間爲:1524368235314
李明要準備釋放許可證了,當前時間爲:1524368236317
當前可以使用的許可數爲:0
pool-1-thread-3:你們好,我是張傑我今年33歲當前時間爲:1524368236317
張傑要準備釋放許可證了,當前時間爲:1524368237317
當前可以使用的許可數爲:0
pool-1-thread-3:你們好,我是張三我今年41歲當前時間爲:1524368237317
張三要準備釋放許可證了,當前時間爲:1524368238317
當前可以使用的許可數爲:0
pool-1-thread-5:你們好,我是趙二我今年19歲當前時間爲:1524368238317
趙二要準備釋放許可證了,當前時間爲:1524368239317
當前可以使用的許可數爲:0
pool-1-thread-2:你們好,我是王五我今年27歲當前時間爲:1524368239317
王五要準備釋放許可證了,當前時間爲:1524368240317
當前可以使用的許可數爲:0
pool-1-thread-4:你們好,我是王強我今年45歲當前時間爲:1524368240317
王強要準備釋放許可證了,當前時間爲:1524368241317
當前可以使用的許可數爲:0
pool-1-thread-1:你們好,我是李四我今年23歲當前時間爲:1524368241317
李四要準備釋放許可證了,當前時間爲:1524368242317
當前可以使用的許可數爲:0

   可見,Semaphore將給定許可數設置爲1,就如同一個單例模式,即單個停車位,只有一輛車進,而後這輛車出來後,下一輛車才能進。

  另外,咱們在上面的案例中用到了線程池:

private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>());

   其中,ThreadPoolExecutor的構造方法體系有:

ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue) 

ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory)

ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        RejectedExecutionHandler handler)

ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler)

   對於構造方法的參數說明以下:

corePoolSize
	核心線程數,默認狀況下核心線程會一直存活,即便處於閒置狀態也不會受keepAliveTime限制。除非將allowCoreThreadTimeOut設置爲true。
maximumPoolSize
	線程池所能容納的最大線程數。超過這個數的線程將被阻塞。當任務隊列爲沒有設置大小的LinkedBlockingDeque時,這個值無效。
keepAliveTime
	非核心線程的閒置超時時間,超過這個時間就會被回收。
unit
	指定keepAliveTime的單位,如TimeUnit.SECONDS。當將allowCoreThreadTimeOut設置爲true時對corePoolSize生效。
workQueue
	線程池中的任務隊列。
	經常使用的隊列有:LinkedBlockingQueue,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。
threadFactory
	線程工廠,提供建立新線程的功能。ThreadFactory是一個接口,只有一個方法:
		public interface ThreadFactory {
			Thread newThread(Runnable r);
		}
RejectedExecutionHandler
	RejectedExecutionHandler也是一個接口,只有一個方法
		public interface RejectedExecutionHandler {
			void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
		}
		當線程池中的資源已經所有使用,添加新線程又被拒絕時,會調用RejectedExecutionHandler的rejectedExecution方法。

   線程池的線程執行規則跟任務隊列有很大的關係。其中:

(1)在任務隊列沒有大小限制時:
    ①若是線程數量<=核心線程數量,那麼直接啓動一個核心線程來執行任務,不會放入隊列中。
   ② 若是線程數量>核心線程數,但<=最大線程數,而且任務隊列是LinkedBlockingDeque的時候,超過核心線程數量的任務會放在任務隊列中排隊。
    ③若是線程數量>核心線程數,但<=最大線程數,而且任務隊列是SynchronousQueue的時候,線程池會建立新線程執行任務,這些任務也不會被放在任務隊列中。這些線程屬於非核心線程,在任務完成後,閒置時間達到了超時時間就會被清除。
    ④若是線程數量>核心線程數,而且>最大線程數,當任務隊列是LinkedBlockingDeque時,會將超過核心線程的任務放在任務隊列中排隊。也就是說,當任務隊列是LinkedBlockingDeque而且沒有大小限制時,線程池的最大線程數設置是無效的,它的線程數最多不會超過核心線程數。
    ⑤若是線程數量>核心線程數,而且>最大線程數,當任務隊列是SynchronousQueue的時候,會由於線程池拒絕添加任務而拋出異常。
(2)在任務隊列大小有限時:
    ①當LinkedBlockingDeque塞滿時,新增的任務會直接建立新線程來執行,當建立的線程數量超過最大線程數量時會拋異常。
    ②SynchronousQueue沒有數量限制。由於它根本不保持這些任務,而是直接交給線程池去執行。當任務數量超過最大線程數時會直接拋異常。 

  在ThreadPoolExecutor中用到了BlockingQueue阻塞隊列的接口。請參考個人另外一篇博文《Java中的BlockingQueue》。  

   4.Phaser

  Phaser是一個更加複雜和強大的同步輔助類,它容許併發執行多階段任務。當咱們有併發任務而且須要分解成幾步執行時,(CyclicBarrier是分紅兩步),就能夠選擇使用Phaser。Phaser類機制是在每一步結束的位置對線程進行同步,當全部的線程都完成了這一步,才容許執行下一步。

  能夠說,Phaser容許併發多階段任務。Phaser類機制是在每一步結束的位置對線程進行同步,當全部的線程都完成了這一步,才容許執行下一步。
  跟其餘同步工具同樣,必須對Phaser類中參與同步操做的任務數進行初始化,不一樣的是,能夠動態的增長或者減小任務數。  

  一個Phaser對象有兩種狀態:

  • 活躍態(Active):當存在參與同步的線程的時候,Phaser就是活躍的,而且在每一個階段結束的時候進行同步。
  • 終止態(Termination):當全部參與同步的線程都取消註冊的時候,Phaser就處於終止態,在終止狀態下,Phaser沒有任何參與者。當Phaser對象onAdvance()方法返回True時,Phaser對象就處於終止態。當Phaser處於終止態時,同步方法arriveAndAwaitAdvance()會當即返回,並且不會作任何同步操做。
arriveAndAwaitAdvance():相似於CyclicBarrier的await()方法,等待其它線程都到來以後同步繼續執行
arriveAndDeregister():把執行到此的線程從Phaser中註銷掉
isTerminated():判斷Phaser是否終止
register():將一個新的參與者註冊到Phaser中,這個新的參與者將被當成沒有執行完本階段的線程
forceTermination():強制Phaser進入終止態

   案例場景:Phaser將同步三個併發任務。這三個任務將在三個不一樣的文件夾及其子文件夾中查找過去24小時內改過擴展名爲.txt的文件。這個任務分解爲三個步驟:①在指定文件夾及其子文件夾中得到擴展名爲.txt的文件;②對第一步的結果過濾,刪除修改時間超過24小時的文件;③將結果打印數據到控制檯。

package com.itszt.test3;
import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
/**
 * 測試Phaser
 */
public class PhaserTest {
    public static void main(String[] args) {
        Phaser phaser=new Phaser(3);
        FileSearch system=new FileSearch("E:\\a", ".txt",
                phaser);
        FileSearch apps=new FileSearch("E:\\b", ".txt",
                phaser);
        FileSearch documents=new FileSearch("E:\\c", ".txt",
                phaser);
        Thread systemThread=new Thread(system, "system-a");
        systemThread.start();
        Thread appsThread=new Thread(apps, "apps-b");
        appsThread.start();
        Thread documentsThread=new Thread(documents, "documents-c");
        documentsThread.start();
        try {
            systemThread.join();
            appsThread.join();
            documentsThread.join();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Terminated:"+ phaser.isTerminated());
    }
}
class FileSearch implements Runnable {
    private String initPath;// 查找路徑
    private String end;// 文件後綴
    private List<String> results;// 結果集
    private Phaser phaser;

    public FileSearch(String initPath, String end, Phaser phaser) {
        this.initPath = initPath;
        this.end = end;
        this.phaser = phaser;
        this.results = new ArrayList<String>();
    }

    private void direactoryProcess(File file) {
        File list[] = file.listFiles();
        if (list != null) {
            for (File f : list) {
                if (f.isDirectory()) {
                    direactoryProcess(f);
                } else {
                    fileProcess(f);
                }
            }
        }
    }

    private void fileProcess(File file) {
        if (file.getName().endsWith(end)) {
            results.add(file.getAbsolutePath());
        }
    }

    private void filterResult() {
        List<String> newResult = new ArrayList<String>();
        long actualDate = new Date().getTime();
        for (int i = 0; i < results.size(); i++) {
            File file = new File(results.get(i));
            long lastModifyTime = file.lastModified();
            if (actualDate - lastModifyTime < TimeUnit.MICROSECONDS.
                    convert(1,
                    TimeUnit.DAYS)) {
                newResult.add(results.get(i));
            }
        }
        results = newResult;
    }

    private boolean checkResults() {
        if (results.isEmpty()) {
            System.out.println(Thread.currentThread().
                    getName() + ": Phase "
                    + phaser.getPhase() + " 0 result");
            System.out.println(Thread.currentThread().
                    getName() + ": Phase "
                    + phaser.getPhase() + " end");
            phaser.arriveAndDeregister();
            return false;
        } else {
            System.out.println(Thread.currentThread().
                    getName() + ": Phase "
                    + phaser.getPhase() + " " +
                    results.size() + " result");
            phaser.arriveAndAwaitAdvance();
            return true;
        }
    }

    private void showInfo() {
        for (int i = 0; i < results.size(); i++) {
            System.out.println(Thread.currentThread().
                    getName() + ":"
                    + results.get(i));
        }
        phaser.arriveAndAwaitAdvance();
    }

    @Override
    public void run() {
        phaser.arriveAndAwaitAdvance();
        System.out.println(Thread.currentThread().
                getName()+": Starting");
        File file=new File(initPath);
        if(file.isDirectory()){
            direactoryProcess(file);
        }
        if(!checkResults()){
            return;
        }
        filterResult();
        if(!checkResults()){
            return;
        }
        showInfo();
        phaser.arriveAndDeregister();
        System.out.println(Thread.currentThread().
                getName()+": Work completed");
    }
}

  控制檯打印以下:

system-a: Starting
system-a: Phase 1 1 result
apps-b: Starting
documents-c: Starting
documents-c: Phase 1 1 result
apps-b: Phase 1 1 result
apps-b: Phase 2 1 result
system-a: Phase 2 1 result
documents-c: Phase 2 1 result
documents-c:E:\c\jsp技術.txt
apps-b:E:\b\jsp技術.txt
system-a:E:\a\jsp技術.txt
system-a: Work completed
documents-c: Work completed
apps-b: Work completed
Terminated:true
相關文章
相關標籤/搜索