Java併發編程--5.信號量和障礙器

Semaphore信號量

簡介

它本質上是一個共享鎖,限制訪問公共資源的線程數目,它也被稱爲計數信號量
acquire()許可一個線程, Semaphore – 1; 沒有可用的許可時,Semaphore=0 ,線程阻塞
release()釋放一個線程, Semaphore + 1html

示例

public class MySemaphore {
    public static void main(String[] args) {
        // 使用線程池
        ExecutorService exec = Executors.newCachedThreadPool();
        // 只容許3個線程同時訪問
        final Semaphore semp = new Semaphore(3);
        
        // 模擬4個客戶端訪問
        for (int index = 0; index < 4; index++) {
            
            Runnable run = new Runnable() {
                public void run() {
                    try {
                        // 獲取許可
                        semp.acquire();
                        
                        System.out.println("線程"+ Thread.currentThread().getName() + "得到許可:");
                        
                        // 模擬耗時的任務
                        for (int i = 0; i < 999999; i++);
                        
                        // 釋放許可
                        semp.release();
                        
                        System.out.println("線程"+ Thread.currentThread().getName() + "釋放許可:");
                        System.out.println("當前容許進入的任務個數:"+ semp.availablePermits());
                    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            
            exec.execute(run);
        }
        // 關閉線程池
        exec.shutdown();
    }
}

控制檯輸出:java

線程pool-1-thread-1得到許可:
線程pool-1-thread-2得到許可:
線程pool-1-thread-2釋放許可: 
當前容許進入的任務個數:2       //總共容許3個許可, 獲取兩個許可, 釋放一個許可, 剩餘2個許可
線程pool-1-thread-1釋放許可:
當前容許進入的任務個數:2      //釋放一個許可, 應該打印出1, 能夠看出, Semaphore並不保證線程安全 
線程pool-1-thread-3得到許可:
線程pool-1-thread-3釋放許可:
當前容許進入的任務個數:2
線程pool-1-thread-4得到許可:
線程pool-1-thread-4釋放許可:
當前容許進入的任務個數:3

CyclicBarrier 障礙器

簡介

容許一組線程互相等待,到達一個公共的障礙點, 該組任務完成後, 再去完成另一個任務
在釋放等待線程後能夠重用,它是循環的barrier安全

示例

public class MyCyclicBarrier {
    public static void main(String[] args) {   
        //建立CyclicBarrier對象, 並設置執行完一組5個線程的併發任務後,再執行MainTask任務  
        CyclicBarrier cb = new CyclicBarrier(5, new MainTask());  
        
        new SubTask("A", cb).start();   
        new SubTask("B", cb).start();   
        new SubTask("C", cb).start();   
        new SubTask("D", cb).start();   
        new SubTask("E", cb).start();  
}   
}   

/** 最後執行的任務 */
class MainTask implements Runnable {
    public void run() {
        System.out.println("......終於要執行最後的任務了......");
    }
}

/** 一組併發任務 */
class SubTask extends Thread {
    private String name;
    private CyclicBarrier cb;

    SubTask(String name, CyclicBarrier cb) {
        this.name = name;
        this.cb = cb;
    }

    public void run() {
        System.out.println("[併發任務" + name + "]  開始執行");
        
        for (int i = 0; i < 999999; i++); // 模擬耗時的任務
        
        System.out.println("[併發任務" + name + "]  執行完畢,通知障礙器");
        try {
            // 每執行完一項任務就通知障礙器
            cb.await();
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

控制檯輸出:併發

[併發任務A]  開始執行
[併發任務D]  開始執行
[併發任務C]  開始執行
[併發任務B]  開始執行
[併發任務E]  開始執行
[併發任務B]  執行完畢,通知障礙器
[併發任務E]  執行完畢,通知障礙器
[併發任務D]  執行完畢,通知障礙器
[併發任務A]  執行完畢,通知障礙器
[併發任務C]  執行完畢,通知障礙器
......終於要執行最後的任務了......     //能夠看出執行一組任務後,在執行這個線程任務

CountDownLatch 障礙器 

簡介

容許1或N個線程等待其餘線程完成後在執行
調用了 countDown() 方法,因此在當前計數到達零以前,await 方法會一直受阻塞。以後,會釋放全部等待的線程,await 的全部後續調用都將當即返回。這種現象只出現一次——計數沒法被重置dom

示例

public class MyCountDownLatch {
    public static void main(String[] args) {
        //啓動會議室線程,等待與會人員參加會議
        Conference conference = new Conference(3);
        new Thread(conference).start();
        
        //參會者線程
        for(int i = 0 ; i < 3 ; i++){
            Participater participater = new Participater("" + i , conference);
            Thread thread = new Thread(participater);
            thread.start();
        }
    }
}

/** 會場類 */
class Conference implements Runnable{
    private final CountDownLatch countDown;//障礙器
    
    public Conference(int count){
        countDown = new CountDownLatch(count);
    }
    
    /** 與會人員到達 */
    public void arrive(String name){
        System.out.println(name + "到達.....");
        
        //到達一個,鎖計數器 - 1, 在計數到達0以前會一直阻塞
        countDown.countDown();
        
        System.out.println("還有 " + countDown.getCount() + "位沒有到達...");
    }
    
    @Override
    public void run() {
        System.out.println("準備開會,參加會議人員總數爲:" + countDown.getCount());
        
        //調用await(),等待全部的與會人員到達
        try {
            countDown.await();
        } catch (InterruptedException e) {
        }
        
        System.out.println("全部人員已經到達,會議開始.....");
    }
}

/** 參會者類*/
class Participater implements Runnable{
    private String name;
    private Conference conference;
    
    public Participater(String name,Conference conference){
        this.name = name;
        this.conference = conference;
    }

    @Override
    public void run() {
        conference.arrive(name);
    }
}

控制檯輸出:ide

準備開會,參加會議人員總數爲:3
2到達.....
還有 2位沒有到達...
0到達.....
還有 1位沒有到達...
1到達.....
全部人員已經到達,會議開始.....
還有 0位沒有到達...

Phaser

簡介

推薦閱讀: http://whitesock.iteye.com/blog/1135457 ui

             http://www.2cto.com/kf/201611/560952.htmlthis

 

任務數目是可變的: 能夠在任什麼時候間註冊新的參與者;而且在抵達屏障點時,能夠註銷已經註冊的參與者atom

phase和party

phase就是階段,初值爲0:spa

當全部的線程執行完本輪任務,同時開始下一輪任務時,意味着當前階段已結束,
進入到下一階段,phase的值自動加1

party就是線程:  party=4就意味着Phaser對象當前管理着4個線程

boolean onAdvance(int phase, int registeredParties) :

1.當此方法返回true時,意味着Phaser被終止, 若此方法返回值爲 phase>=3,其含義爲當整個線程執行了4個階段後,程序終止

2.當每個階段執行完畢,此方法會被自動調用 ,此方法內的代碼會在每一個階段執行完畢時執行

示例: 可變數目的任務

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;

/**
 *可變數目: 動態註冊和取消 
 *
 *示例: 
 *    在旅遊過程當中,有可能很湊巧遇到幾個朋友,
 *    而後他們據說大家在旅遊,因此想要加入一塊兒繼續接下來的旅遊.
 *    也有可能,在旅遊過程當中,忽然其中有某幾我的臨時有事,想退出此次旅遊了
 */
public class MyPhaser_5 {
    public static void main(String[] args) {
        final int num = 3;
        Phaser phaser = new Phaser(num){
            /**
             * 若是該方法返回true,那麼Phaser會被終止, 默認實現是在註冊任務數爲0時返回true
             * phase : 階段數
             * registeredParties : 註冊的線程數 
             */
             @Override
             protected boolean onAdvance(int phase, int registeredParties) {
                 System.out.println("" + getArrivedParties() + "我的都到齊了,第" + (phase + 1) + "次集合 \n");
                 return phase >= num;
             }
        };
        
        new Thread(new TourismRunnable(phaser),"小明").start();
        new Thread(new TourismRunnable(phaser),"小剛").start();
        new Thread(new TourismRunnable(phaser),"小紅").start();
    }
}

/** 旅行線程 */
class TourismRunnable implements Runnable{
    Phaser phaser;
    /**
     * 每一個線程保存一個朋友計數器,小紅第一次遇到一個朋友,取名`小紅的朋友0號`,第二次遇到一個朋友,取名爲`小紅的朋友1號`
     */
    AtomicInteger frientCount = new AtomicInteger();
    
    public TourismRunnable(Phaser phaser) {
        this.phaser = phaser;
    }
 
    @Override
    public void run() {
         switch (phaser.getPhase()){
             case 0:if(!goToPoint("出發點")) break;
             case 1:if(!goToPoint("旅遊景點")) break;
             case 2:if(!goToPoint("酒店")) break;
         }
    }
 
    /**
     * @param point 目的地
     * @return 返回true,說明還要繼續旅遊,不然就臨時退出了
     */
    private boolean goToPoint(String point){
        try {
            if(!randomEvent()){
                //取消註冊
                phaser.arriveAndDeregister();
                return false;
            }
            System.out.println(Thread.currentThread().getName() + "到了" + point);
            
            //阻塞
            phaser.arriveAndAwaitAdvance();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
 
    /**
     * 隨機事件: 遇到新朋友一塊兒旅遊 或者 中途退出旅遊
     * @return 返回true,說明還要繼續旅遊,不然就臨時退出了
     */
    private boolean randomEvent() {
        int random = new Random().nextInt(100);
        String name = Thread.currentThread().getName();
        
        if (random < 10){
            int friendNum =  1;
            System.out.println("=====================" + name + ":遇到了"+friendNum+"個朋友,要一塊兒去旅遊");
            
            new Thread(new TourismRunnable(phaser), name + "的朋友" + frientCount.incrementAndGet() + "號").start();
            //註冊
            phaser.bulkRegister(friendNum);
            
        }else if(random > 80){
            System.out.println("=====================" + name + ":忽然有事要離開一下,不和他們繼續旅遊了");
            return false;
        }
        
        return true;
    }
}
相關文章
相關標籤/搜索