(十一)java多線程之Phaser

本人郵箱: <kco1989@qq.com>
歡迎轉載,轉載請註明網址 http://blog.csdn.net/tianshi_kco
github: https://github.com/kco1989/kco
代碼已經所有託管github有須要的同窗自行下載java

引言

講完了CyclicBarrierCountDownLatch,今天講一個跟這兩個類有點相似的Phaser.->移相器git

java7中引入了一種新的可重複使用的同步屏障,稱爲移相器Phaser.Phaser擁有與CyclicBarrierCountDownLatch相似的功勞.可是這個類提供了更加靈活的應用.CountDownLatchCyclicBarrier都是隻適用於固定數量的參與者.移相器適用於可變數目的屏障,在這個意義上,能夠在任什麼時候間註冊新的參與者.而且在抵達屏障是能夠註銷已經註冊的參與者.所以,註冊到同步移相器的參與者的數目可能會隨着時間的推移而變化.如CyclicBarrier同樣,移相器能夠重複使用,這意味着當前參與者到達移相器後,能夠再一次註冊本身並等待另外一次到達.所以,移相器會有多代.一旦爲某個特定相位註冊的全部參與者都到達移相器,就增長相數.相數從零開始,在達到Integer.MAX_VALUE後,再次繞回0.當移相器發生變化時,經過重寫onAdvance方法,能夠自行可選操做.這個方法也可用於終止移相器.移相器一旦被終止,全部的同步方法就會當即返回,並嘗試註冊新的失敗的參與者.
移相器的另外一個重要特徵是:移相器多是分層的,這容許你以樹形結構來安排移相器以減小競爭.很明顯,更小的組將擁有更少的競爭同步的參與者.所以,將大量的參與者分紅較小的組能夠減小競爭.雖然建立移相器能增長中的吞吐量,可是這須要更多的開銷.最後,移相器的另外一個重要的特徵在於監控功能,使用獨立的對象能夠監視移相器的當前狀態.監視器能夠查詢註冊到移相器的參與者的數量,以及已經到達和尚未到達某個特定相數的參與者的數量.1github

例子1 用Phaser代替CyclicBarrier

將以前(九)java多線程之CyclicBarrier旅遊的例子改寫一下,微信

Phaser替代CyclicBarrier比較簡單,CyclicBarrier的await()方法能夠直接用Phaser的arriveAndAwaitAdvance()方法替代
CyclicBarrierPhaser:CyclicBarrier只適用於固定數量的參與者,而Phaser適用於可變數目的屏障.多線程

  • TourismRunnable 旅遊類dom

public class TourismRunnable implements Runnable{
    Phaser phaser;
    Random random;
    public TourismRunnable(Phaser phaser) {
        this.phaser = phaser;
        this.random = new Random();
    }

    @Override
    public void run() {
        tourism();
    }

    /**
     * 旅遊過程
     */
    private void tourism() {
        goToStartingPoint();
        goToHotel();
        goToTourismPoint1();
        goToTourismPoint2();
        goToTourismPoint3();
        goToEndPoint();
    }

    /**
     * 裝備返程
     */
    private void goToEndPoint() {
        goToPoint("飛機場,準備登機回家");
    }

    /**
     * 到達旅遊點3
     */
    private void goToTourismPoint3() {
        goToPoint("旅遊點3");
    }

    /**
     * 到達旅遊點2
     */
    private void goToTourismPoint2() {
        goToPoint("旅遊點2");
    }

    /**
     * 到達旅遊點1
     */
    private void goToTourismPoint1() {
        goToPoint("旅遊點1");
    }

    /**
     * 入住酒店
     */
    private void goToHotel() {
        goToPoint("酒店");
    }

    /**
     * 出發點集合
     */
    private void goToStartingPoint() {
        goToPoint("出發點");
    }

    private int getRandomTime(){
        int time = this.random.nextInt(400) + 100;
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return time;
    }

    private void goToPoint(String point){
        try {
            String name = Thread.currentThread().getName();
            System.out.println(name + " 花了 " + getRandomTime() + " 時間纔到了" + point);
            phaser.arriveAndAwaitAdvance();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • TestMain 測試類ide

public class TestMain {

    public static void main(String[] args) {
        String name = "明剛紅麗黑白";
        Phaser phaser = new Phaser(name.length());
        List<Thread> tourismThread = new ArrayList<>();
        for (char ch : name.toCharArray()){
            tourismThread.add(new Thread(new TourismRunnable(phaser), "小" + ch));
        }
        for (Thread thread : tourismThread){
            thread.start();
        }
    }
}

運行結果學習

小紅 花了 122 時間纔到了出發點
小明 花了 259 時間纔到了出發點
小白 花了 267 時間纔到了出發點
小麗 花了 306 時間纔到了出發點
小剛 花了 385 時間纔到了出發點
小黑 花了 486 時間纔到了出發點
小白 花了 299 時間纔到了酒店
小剛 花了 345 時間纔到了酒店
小黑 花了 449 時間纔到了酒店
小麗 花了 452 時間纔到了酒店
小明 花了 462 時間纔到了酒店
小紅 花了 480 時間纔到了酒店
小麗 花了 107 時間纔到了旅遊點1
小紅 花了 141 時間纔到了旅遊點1
小明 花了 212 時間纔到了旅遊點1
小黑 花了 286 時間纔到了旅遊點1
小白 花了 305 時間纔到了旅遊點1
小剛 花了 386 時間纔到了旅遊點1
小麗 花了 119 時間纔到了旅遊點2
小黑 花了 222 時間纔到了旅遊點2
小明 花了 259 時間纔到了旅遊點2
小剛 花了 299 時間纔到了旅遊點2
小紅 花了 354 時間纔到了旅遊點2
小白 花了 422 時間纔到了旅遊點2
小麗 花了 112 時間纔到了旅遊點3
小白 花了 182 時間纔到了旅遊點3
小剛 花了 283 時間纔到了旅遊點3
小明 花了 295 時間纔到了旅遊點3
小紅 花了 386 時間纔到了旅遊點3
小黑 花了 483 時間纔到了旅遊點3
小黑 花了 152 時間纔到了飛機場,準備登機回家
小白 花了 178 時間纔到了飛機場,準備登機回家
小明 花了 248 時間纔到了飛機場,準備登機回家
小紅 花了 362 時間纔到了飛機場,準備登機回家
小麗 花了 428 時間纔到了飛機場,準備登機回家
小剛 花了 432 時間纔到了飛機場,準備登機回家
  • Phaser(int parties) 建立一個指定parties個線程參與同步任務.測試

  • ``this

例子2 用Phaser代替CountDownLatch

將以前(十)java多線程之CountDownLatch旅遊回來坐飛機的例子改寫一下,

CountDownLatch主要使用的有2個方法

  • await()方法,可使線程進入等待狀態,在Phaser中,與之對應的方法是awaitAdvance(int n)

  • countDown(),使計數器減一,當計數器爲0時全部等待的線程開始執行,在Phaser中,與之對應的方法是arrive()

  • Airplane飛機類

public class Airplane {
    private Phaser phaser;
    private Random random;
    public Airplane(int peopleNum){
        phaser = new Phaser(peopleNum);
        random = new Random();
    }

    /**
     * 下機
     */
    public void getOffPlane(){
        try {
            String name = Thread.currentThread().getName();
            Thread.sleep(random.nextInt(500));
            System.out.println(name + " 在飛機在休息着....");
            Thread.sleep(random.nextInt(500));
            System.out.println(name + " 下飛機了");
            phaser.arrive();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void doWork(){

        String name = Thread.currentThread().getName();
        System.out.println(name + "準備作 清理 工做");
        phaser.awaitAdvance(phaser.getPhase());
        System.out.println("飛機的乘客都下機," + name + "能夠開始作 清理 工做");

    }

}
  • TestMain 測試類(沒有改)

public class TestMain {

    public static void main(String[] args) {
        String visitor = "明剛紅麗黑白";
        String kongjie = "美惠花";

        Airplane airplane = new Airplane(visitor.length());
        Set<Thread> threads = new HashSet<>();
        for (int i = 0; i < visitor.length(); i ++){
            threads.add(new Thread(() -> {
                airplane.getOffPlane();
            }, "小" + visitor.charAt(i)));
        }
        for (int i = 0; i < kongjie.length(); i ++){
            threads.add(new Thread(() ->{
                airplane.doWork();
            }, "小" + kongjie.charAt(i) + "空姐"));
        }

        for (Thread thread : threads){
            thread.start();
        }
    }
}

運行結果

小花空姐準備作 清理 工做
小惠空姐準備作 清理 工做
小美空姐準備作 清理 工做
小黑 在飛機在休息着....
小明 在飛機在休息着....
小紅 在飛機在休息着....
小麗 在飛機在休息着....
小剛 在飛機在休息着....
小明 下飛機了
小紅 下飛機了
小黑 下飛機了
小白 在飛機在休息着....
小麗 下飛機了
小剛 下飛機了
小白 下飛機了
飛機的乘客都下機,小美空姐能夠開始作 清理 工做
飛機的乘客都下機,小花空姐能夠開始作 清理 工做
飛機的乘客都下機,小惠空姐能夠開始作 清理 工做

例子3 高級用法

前面兩個例子都比較簡單,如今咱們還用Phaser一個比較高級一點用法.仍是用旅遊的例子
假若有這麼一個場景,在旅遊過程當中,有可能很湊巧遇到幾個朋友,而後他們據說大家在旅遊,因此想要加入一塊兒繼續接下來的旅遊.也有可能,在旅遊過程當中,忽然其中有某幾我的臨時有事,想退出此次旅遊了.在自由行的旅遊,這是很常見的一些事情.若是如今咱們使用CyclicBarrier這個類來實現,咱們發現是實現不了,這是用Phaser就可實現這個功能.

  • 首先,咱們改寫旅遊類 TourismRunnable,此次改動相對比較多一點

public class TourismRunnable implements Runnable{
    Phaser phaser;
    Random random;
    /**
     * 每一個線程保存一個朋友計數器,好比小紅第一次遇到一個朋友,則取名`小紅的朋友0號`,
     * 而後旅遊到其餘景點的時候,若是小紅又遇到一個朋友,這取名爲`小紅的朋友1號`
     */
    AtomicInteger frientCount = new AtomicInteger();
    public TourismRunnable(Phaser phaser) {
        this.phaser = phaser;
        this.random = new Random();
    }

    @Override
    public void run() {
        tourism();
    }

    /**
     * 旅遊過程
     */
    private void tourism() {
        switch (phaser.getPhase()){
            case 0:if(!goToStartingPoint()) break;
            case 1:if(!goToHotel()) break;
            case 2:if(!goToTourismPoint1()) break;
            case 3:if(!goToTourismPoint2()) break;
            case 4:if(!goToTourismPoint3()) break;
            case 5:if(!goToEndPoint()) break;
        }
    }

    /**
     * 準備返程
     * @return 返回true,說明還要繼續旅遊,不然就臨時退出了
     */
    private boolean goToEndPoint() {
        return goToPoint("飛機場,準備登機回家");
    }

    /**
     * 到達旅遊點3
     * @return 返回true,說明還要繼續旅遊,不然就臨時退出了
     */
    private boolean goToTourismPoint3() {
        return goToPoint("旅遊點3");
    }

    /**
     * 到達旅遊點2
     * @return 返回true,說明還要繼續旅遊,不然就臨時退出了
     */
    private boolean goToTourismPoint2() {
        return goToPoint("旅遊點2");
    }

    /**
     * 到達旅遊點1
     * @return 返回true,說明還要繼續旅遊,不然就臨時退出了
     */
    private boolean goToTourismPoint1() {
        return goToPoint("旅遊點1");
    }

    /**
     * 入住酒店
     * @return 返回true,說明還要繼續旅遊,不然就臨時退出了
     */
    private boolean goToHotel() {
        return goToPoint("酒店");
    }

    /**
     * 出發點集合
     * @return 返回true,說明還要繼續旅遊,不然就臨時退出了
     */
    private boolean goToStartingPoint() {
        return goToPoint("出發點");
    }

    private int getRandomTime() throws InterruptedException {
        int time = random.nextInt(400) + 100;
        Thread.sleep(time);
        return time;
    }

    /**
     * @param point 集合點
     * @return 返回true,說明還要繼續旅遊,不然就臨時退出了
     */
    private boolean goToPoint(String point){
        try {
            if(!randomEvent()){
                phaser.arriveAndDeregister();
                return false;
            }
            String name = Thread.currentThread().getName();
            System.out.println(name + " 花了 " + getRandomTime() + " 時間纔到了" + point);
            phaser.arriveAndAwaitAdvance();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 隨機事件
     * @return 返回true,說明還要繼續旅遊,不然就臨時退出了
     */
    private boolean randomEvent() {
        int r = random.nextInt(100);
        String name = Thread.currentThread().getName();
        if (r < 10){
            int friendNum =  1;
            System.out.println(name + ":在這裏居然遇到了"+friendNum+"個朋友,他們說要一塊兒去旅遊...");
            phaser.bulkRegister(friendNum);
            for (int i = 0; i < friendNum; i ++){
                new Thread(new TourismRunnable(phaser), name + "的朋友" + frientCount.getAndAdd(1) + "號").start();
            }
        }else if(r > 90){
            System.out.println(name + ":忽然有事要離開一下,不和他們繼續旅遊了....");
            return false;
        }
        return true;
    }
}

代碼解析

tourism這個方法的case寫法看起有點怪異,若是是爲了知足咱們這個需求,這裏的case的意思是-->case 第幾回集合: if(是否繼續旅遊) 若不繼續則break,不然繼續後面的旅遊
phaser.getPhase() 初始值爲0,若是所有人到達集合點這個Phase+1,若是phaser.getPhase()達到Integer的最大值,這從新清空爲0,在這裏表示第幾回集合了
phaser.arriveAndDeregister(); 表示這我的旅遊到這個景點以後,就離開這個旅遊團了
phaser.arriveAndAwaitAdvance(); 表示這我的在這個景點旅遊完,在等待其餘人
phaser.bulkRegister(friendNum); 表示這我的在這個景點遇到了friendNum個朋友,他們要加入一塊兒旅遊

  • 最後咱們的測試代碼仍是差很少的,比例子1多了一個到齊後的操做

public class TestMain {

    public static void main(String[] args) {
        String name = "明剛紅麗黑白";
        Phaser phaser = new SubPhaser(name.length());
        List<Thread> tourismThread = new ArrayList<>();
        for (char ch : name.toCharArray()){
            tourismThread.add(new Thread(new TourismRunnable(phaser), "小" + ch));
        }
        for (Thread thread : tourismThread){
            thread.start();
        }
    }
    public static class SubPhaser extends Phaser{
        public SubPhaser(int parties) {
            super(parties);
        }

        @Override
        protected boolean onAdvance(int phase, int registeredParties) {

            System.out.println(Thread.currentThread().getName() + ":所有"+getArrivedParties()+"我的都到齊了,如今是第"+(phase + 1)
                    +"次集合準備去下一個地方..................\n");
            return super.onAdvance(phase, registeredParties);
        }
    }
}

運行輸出如下結果:

小白 花了 109 時間纔到了出發點
小紅 花了 135 時間纔到了出發點
小麗 花了 218 時間纔到了出發點
小黑 花了 297 時間纔到了出發點
小明 花了 303 時間纔到了出發點
小剛 花了 440 時間纔到了出發點
小剛:所有6我的都到齊了,如今是第1次集合準備去下一個地方..................

小明:忽然有事要離開一下,不和他們繼續旅遊了....
小剛:忽然有事要離開一下,不和他們繼續旅遊了....
小紅 花了 127 時間纔到了酒店
小麗 花了 162 時間纔到了酒店
小黑 花了 365 時間纔到了酒店
小白 花了 474 時間纔到了酒店
小白:所有4我的都到齊了,如今是第2次集合準備去下一個地方..................

小黑:忽然有事要離開一下,不和他們繼續旅遊了....
小麗:忽然有事要離開一下,不和他們繼續旅遊了....
小紅 花了 348 時間纔到了旅遊點1
小白 花了 481 時間纔到了旅遊點1
小白:所有2我的都到齊了,如今是第3次集合準備去下一個地方..................

小白 花了 128 時間纔到了旅遊點2
小紅 花了 486 時間纔到了旅遊點2
小紅:所有2我的都到齊了,如今是第4次集合準備去下一個地方..................

小紅 花了 159 時間纔到了旅遊點3
小白 花了 391 時間纔到了旅遊點3
小白:所有2我的都到齊了,如今是第5次集合準備去下一個地方..................

小白:在這裏居然遇到了1個朋友,他們說要一塊兒去旅遊...
小白 花了 169 時間纔到了飛機場,準備登機回家
小紅 花了 260 時間纔到了飛機場,準備登機回家
小白的朋友0號 花了 478 時間纔到了飛機場,準備登機回家
小白的朋友0號:所有3我的都到齊了,如今是第6次集合準備去下一個地方..................

經過結果配合我上面的解釋,仍是比較好理解的.

遺漏

這裏還有phaser的中斷和樹形結構沒有舉例子,後續想到比較後的例子,我會繼續作補充的

後記

這篇是我目前爲止寫的最慢的一篇博文,由於以前沒有使用過phaser,致使在寫的出現不少問題.因此一邊查資料,一邊學習,總算仍是把這個phaser給理解了.

打賞

若是以爲個人文章寫的還過得去的話,有錢就捧個錢場,沒錢給我捧我的場(幫我點贊或推薦一下)
微信打賞
支付寶打賞


  1. java7
相關文章
相關標籤/搜索