我理解的Java併發基礎(五):併發工具類和ThreadLocal

J.U.C併發包提供了幾個很是有用的、用於併發流程控制的CountDownLatch、CyclicBarrier、Semaphore、類等。java

1,CountDownLatch,閉鎖。實現相似計數器的功能。CountDownLatch的經常使用API以下:數據庫

CountDownLatch(int count) // 構造方法,接受一個int類型參數表示總計數
void await() throws InterruptedException // 阻塞當前線程,直到計數=0,或者線程被中斷
boolean await(long timeout, TimeUnit unit) throws InterruptedException // 一段時間內阻塞當前線程。若是計數=0被喚醒則返回true,若是超時被喚醒則返回false
void countDown() // 將計數-1
long getCount() // 獲取當前計數值,經常使用於debug

舉個例子。10個運動員比賽百米賽跑,只有這10個運動員都準備好以後,發令員才能發槍。緩存

public class MyTest4CountDownLatch {
    
    public static void main(String[] args) {
        int playerNumbs = 10; // 運動員數量
        CountDownLatch cdl = new CountDownLatch(playerNumbs); // 建立一個CountDownLatch對象(cdl的初始計數=10)
        
        Thread starter = new Starter(cdl); // 建立一個發令員
        List<Thread> players = new ArrayList<>(playerNumbs); // 建立10個運動員(放到一個list集合中方便操做)
        for (int i = 0; i < playerNumbs; i++) {
            Thread player = new Player(i, cdl);
            players.add(player);
        }
        
        starter.start(); 
        for (int i = 0; i < playerNumbs; i++) {
            players.get(i).start();
        }
        
    }
    
}

class Player extends Thread{
    
    private CountDownLatch cdl;
    private int number;
    
    Player(int number, CountDownLatch cdl){
        this.cdl = cdl;
        this.number = number;
    }
    
    @Override
    public void run() {
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("運動員" + number + "準備好了.");
        cdl.countDown(); // 該運動員準備就緒(cdl的計數-1)
    }
}

class Starter extends Thread {
    private CountDownLatch cdl;
    
    Starter(CountDownLatch cdl){
        this.cdl = cdl;
    }
    
    @Override
    public void run() {
        System.out.println("發令員舉起手槍,等待全部運動員準備就緒.");
        try {
            cdl.await(); // 等待全部運動員準備就緒(等待cdl的計數=0)
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("全部運動員已準備就緒,發令員發槍。");
    }
}

2,CyclicBarrier,循環柵欄。等待線程的數量達到目標的時候,全部等待的線程同時執行。可重置。CyclicBarrier的經常使用API以下:安全

CyclicBarrier(int parties) // 構造方法,指明須要等待的計數
CyclicBarrier(int parties, Runnable barrierAction) // 構造方法,指明須要等待的計數 和 計數=0時的觸發操做
int getParties() // 返回初始化指定的parties
int await() throws InterruptedException, BrokenBarrierException // 阻塞當前線程。返回剩餘等待計數。若是返回paties-1則表示是第1個到達。若是返回0則表示最後一個到達
int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException // 一段時間內阻塞等待。響應線程中斷標識。返回值同await()
boolean isBroken() // 若是當前cyclicBarrier對象處於broken狀態則返回true
void reset() // 重置計數
int getNumberWaiting() // 返回剩餘計數

  CyclicBarrier是一個全部線程要麼全經過,要麼全不經過的工具。若是有線程調用await(long timeout, TimeUnit unit) 超時經過,則CyclicBarrier處於broken狀態,其餘別的正在等待的線程會收到InterruptedException, 後續調用await()的線程會收到BrokenBarrierException。多線程

舉個例子。地鐵的修建是按分段來修建的,只有全部的分段都施工完成以後,才能夠通車。併發

public class MyTest4CyclicBarrier {

    public static void main(String[] args) throws InterruptedException {
        int lineSegementNumbs = 3;
        CyclicBarrier cb = new CyclicBarrier(lineSegementNumbs); // 建立CyclicBarrier對象 (cb計數=3)
        
        for (int i = 0; i < lineSegementNumbs; i++) {
            new LineSegment(i, cb).start();
        }
        
        Thread.sleep(1000);
        System.out.println("CyclicBarrier重用"); // 自動重置計數
        
        for (int i = 0; i < lineSegementNumbs; i++) {
            new LineSegment(i, cb).start();
        }
        
        
    }
}

class LineSegment extends Thread{
    
    private int segementNum;
    private CyclicBarrier cb;
    
    LineSegment(int segementNum, CyclicBarrier cb){
        this.segementNum = segementNum;
        this.cb = cb;
    }
    
    
    @Override
    public void run() {
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("線路段" + segementNum +"已施工完成.");
        try {
            cb.await(); // cb計數-1,等待cb計數=0
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("通車了...");
    }
    
}

3,Semaphore,信號量。一般用來表示許可數量,用於限制能夠訪問某些資源(物理或邏輯的)的線程數目。Semaphore的經常使用API以下:dom

Semaphore(int permits) // 構造方法,指定許可數量,默認非公平
Semaphore(int permits, boolean fair) // 構造方法,指定許可數量,制定是否公平獲取
void acquire() throws InterruptedException // 阻塞等待許可。阻塞期間響應線程的中斷標識
void acquireUninterruptibly() // 阻塞等待許可,不相應線程中斷標識。
boolean tryAcquire() // 嘗試獲取許可。若是失敗則返回false,若是獲取成功則返回true
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException // 一段時間內阻塞等待許可,阻塞期間響應線程的中斷標識
void release() // 釋放一個許可

// 如下API中,int類型的permits參數表示一次獲取或釋放多個許可
void acquire(int permits) throws InterruptedException // 同 acquire()
void acquireUninterruptibly(int permits) // 同 acquireUninterruptibly()
boolean tryAcquire(int permits) // 同 tryAcquire()
boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException // 同 tryAcquire(long timeout, TimeUnit unit)
void release(int permits) // 同 release()

// 如下API經常使用於控制、監控或debug
int availablePermits() // 剩餘可用的許可數量
int drainPermits() // 將全部剩餘許可置爲0
int getQueueLength() // 等待獲取許可的線程數量

舉個例子。橋每次最多能經過兩我的,每一個人經過橋的時間爲10秒,橋東西兩側各有10我的同時等待準備過橋。輸出每次過橋人的姓名、過橋方向和過橋時間。ide

public class MyTest4Semaphore {

    public static void main(String[] args) throws InterruptedException {
        String[] gruopEast = {"張三0", "張三1", "張三2", "張三3", "張三4", "張三5", "張三6", "張三7", "張三8", "張三9"};
        String[] gruopWest = {"李四0", "李四1", "李四2", "李四3", "李四4", "李四5", "李四6", "李四7", "李四8", "李四9"};

        int takeTime = 10;

        CyclicBarrier cb = new CyclicBarrier(20); // 使用CyclicBarrier來達到「同時準備過橋」的目的。
        Semaphore semaphore = new Semaphore(2);

        for (int i = 0; i < gruopEast.length; i++) {
            new Player(gruopEast[i], "西", takeTime, cb, semaphore).start();
        }

        for (int i = 0; i < gruopWest.length; i++) {
            new Player(gruopWest[i], "東", takeTime, cb, semaphore).start();
        }
    }
}

class Player extends Thread {
    private String name;
    private String destination;
    private int takeTimeSeconds;
    private CyclicBarrier cb;
    private Semaphore semaphore;

    public Player(String name, String destination, int takeTimeSeconds, CyclicBarrier cb, Semaphore semaphore) {
        super();
        this.name = name;
        this.destination = destination;
        this.takeTimeSeconds = takeTimeSeconds;
        this.cb = cb;
        this.semaphore = semaphore;
    }

    @Override
    public void run(){
        try {
            cb.await();
            semaphore.acquire(); // 阻塞獲取許可
            System.out.println(name + "準備向" + destination + "過橋,須要花費" + takeTimeSeconds + "秒");
            Thread.sleep(1000 * takeTimeSeconds);
            semaphore.release(); // 釋放許可
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

4,Phaser,JDK1.7新增,功能上比CyclicBarrier、CountDownLatch更強大,提供更爲豐富的API。經常使用於多線程參與多階段完成的場景。在不一樣階段,能夠等待不一樣數量人的人完成,再進入下一階段。在進入下一個階段的時候,用戶還能夠重寫onAdvance()來實現更佳自定義更加靈活的場景。Phaser的經常使用API以下:工具

Phaser(int parties) // 構造方法,指明有多少個線程參與
int register() // 當前線程註冊到當前階段
int arrive() // 當前線程已完成本身的工做
int arriveAndDeregister() // 當前線程已完成本身的工做,並取消註冊(不參與下一個階段)。
int arriveAndAwaitAdvance()  // 當前線程已完成本身的工做,等待進入一下個階段(參與下一個階段)
int awaitAdvance(int phase) // 阻塞等待第phase個階段
int awaitAdvanceInterruptibly(int phase) //  阻塞等待第phase個階段,阻塞期間響應線程中斷標識
int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) // 阻塞等待第phase個階段一段時間,阻塞期間響應線程中斷標識
void forceTermination() // 強制將該phaser對象設置爲中指狀態
int getPhase() // 獲取當前第phase個階段
int getRegisteredParties() // 獲取當前階段的註冊數量
int getArrivedParties() // 獲取當前階段已經完成的線程數量
int getUnarrivedParties() // 獲取當前階段還未完成的線程數量
boolean isTerminated() // 判斷該phaser是否可用
boolean onAdvance(int phase, int registeredParties) // protected方法,當一個階段完成後觸發,進入下一個階段以前的動做。

  每一個Phaser實例都會維護一個phase number,初始值爲0。每當全部註冊的任務都到達Phaser時,phase number累加,並在超過Integer.MAX_VALUE後清零。arrive()和arriveAndDeregister()方法用於記錄到達;其中arrive(),某個參與者完成任務後調用;arriveAndDeregister(),任務完成,取消本身的註冊。arriveAndAwaitAdvance(),本身完成等待其餘參與者完成,進入阻塞,直到Phaser成功進入下個階段。測試

舉個例子。有一個項目,分爲4個階段完成。第一個階段有工人A,工人B,工人C共同參與。第二階段由工人C、工人D共同完成,第三階段由工人B、工人E共同完成,第四階段由工人A、B、C共同完成。前一個階段完成以後,才能進入下一個階段的工做。每一個階段完成以後,先向經理彙報,而後進入下一階段。

public class MyTest4Phaser {

    public static void main(String[] args) {
        Phaser phaser = new MyProject(5);

        int[] workerAPhaseArray = new int[] {0, 3};
        int[] workerBPhaseArray = new int[] {0, 2, 3};
        int[] workerCPhaseArray = new int[] {0, 1, 3};
        int[] workerDPhaseArray = new int[] {1};
        int[] workerEPhaseArray = new int[] {2};

        int phaseAmount = 4;

        Worker workerA = new Worker("A", workerAPhaseArray, phaser, phaseAmount);
        Worker workerB = new Worker("B", workerBPhaseArray, phaser, phaseAmount);
        Worker workerC = new Worker("C", workerCPhaseArray, phaser, phaseAmount);
        Worker workerD = new Worker("D", workerDPhaseArray, phaser, phaseAmount);
        Worker workerE = new Worker("E", workerEPhaseArray, phaser, phaseAmount);

        workerA.start();
        workerB.start();
        workerC.start();
        workerD.start();
        workerE.start();

    }
}

class MyProject extends Phaser {

    public MyProject(int parties){
        super(parties);
    }

    @Override
    public boolean onAdvance(int currenPphase, int registeredParties) {
        if (currenPphase < 3) {
            System.out.println("通知經理:已經完成了第" + currenPphase + "階段任務,準備執行第" + (currenPphase + 1) + "階段任務。");
        }else {
            System.out.println("通知經理:已完成了全部任務");
        }
        return registeredParties == 0;
    }
}

class Worker extends Thread {
    protected String name;
    protected Phaser phaser;
    private int[] phaseArray;
    private int phaseAmount;

    public Worker (String name,int[] phaseArray, Phaser phaser, int phaseAmount){
        super(name);
        this.name = name;
        this.phaseArray = phaseArray;
        this.phaser = phaser;
        this.phaseAmount = phaseAmount;
        if (phaseArray == null || phaseArray.length == 0) throw new RuntimeException("工人蔘與的階段錯誤");
    }

    public void doWork(){
        Set<Integer> set = intArray2Set(phaseArray);
        int lastPhase = phaseArray[phaseArray.length-1]; 
        for (int i = 0; i < phaseAmount && (!phaser.isTerminated()); i++) {
            int currentPhase = phaser.getPhase();
            if (set.contains(currentPhase)) {
                outPrint(currentPhase);
                if (lastPhase == currentPhase) {
                    phaser.arriveAndDeregister();
                    break;
                }else {
                    phaser.arriveAndAwaitAdvance();
                }
            }else{
                phaser.arriveAndAwaitAdvance();
            }
        }

    }

    public Set<Integer> intArray2Set(int[] phaseArray){
        return Arrays.stream(phaseArray).boxed().collect(Collectors.toSet());
    }

    protected void outPrint(int i){
        try {
            System.out.println("工人" + name + "正在執行第" + i + "階段任務");
            Thread.sleep(1000);
            System.out.println("工人" + name + "執行完成第" + i + "階段任務");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        doWork();
    }
}

5,Exchanger<V>,線程間的數據交換器。兩個線程在一個安全點彼此交換數據。該類比較簡單,就3個API:

public Exchanger() // 
V exchange(V x) throws InterruptedException // 
V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException //

  第一個調用exchange()的線程會阻塞等待,直到第二個線程調用exchange()來完成彼此數據的交換。

舉個例子。飛機駕駛員有主飛和副飛,重要消息須要兩者互相確認的。

public class MyTest4Exchanger {

    public static void main(String[] args) {
        
        Exchanger<String> exchanger = new Exchanger<>();
        Thread primary = new Pilot("主飛", "10分鐘後降落", exchanger);
        Thread secondary = new Pilot("副飛", "10分鐘後降落", exchanger);
        
        primary.start();
        secondary.start();
    }
}

class Pilot extends Thread {
    
    private String pilotName;
    private String receivedMsg;
    private Exchanger<String> exchanger;
    
    Pilot(String pilotName, String receivedMsg, Exchanger<String> exchanger){
        this.pilotName = pilotName;
        this.receivedMsg = receivedMsg;
        this.exchanger = exchanger;
    }
    
    @Override
    public void run() {
        String ownReceivedMsg = receivedMsg;
        String otherReceivedMsg = null;
        try {
            otherReceivedMsg = exchanger.exchange(ownReceivedMsg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(pilotName + "彙報:" + (ownReceivedMsg.equals(otherReceivedMsg) ? "消息一致." : "不消息一致."));
    }
    
}

  以上介紹的5個工具類均爲線程間的互相通信的工具類。還有一個線程私有的工具類,ThreadLocal。不過,ThreadLocal是存在於java.lang包下的。

6,ThreadLocal<T>,本地線程變量。ThreadLocal爲每一個使用該變量的線程提供獨立的變量副本,因此每個線程均可以獨立地改變本身的副本,而不會影響其它線程所對應的副本。ThreadLocal的API也比較簡單:

ThreadLocal() // 構造方法
T get() // 獲取當前線程中存儲的本地變量
void set(T value) // 將value設置到當前線程的本地變量中存儲
void remove() // 刪除當前線程中存儲的本地變量

舉個例子。在多數據源處理中,以讀寫分離爲例,能夠將數據源的標識放到ThreadLocal中,使用aop來自動完成切換工做。本例就簡單的模擬一下。(例子中不採用AOP了,直接代碼中體現)

public class MyTest4ThreadLocal {

    
    public static void main(String[] args) {
        
        Map<String, String> name2DataSource = new HashMap<>(); // 緩存每一個datasource,value以String代替
        name2DataSource.put("read", "ReadDataBase");
        name2DataSource.put("write", "WriteDataBase");
        
        for(int i = 0; i < 10; i++){
            new BussinessThread(i, name2DataSource).start();
        }
        
    }
}

class DataSourceHolder {
    
    private static final ThreadLocal<String> dataSources = new ThreadLocal<>();

    public static void setDataSourceKey(String customType) {
        dataSources.set(customType);
    }

    public static String getDataSourceKey() {
        return (String) dataSources.get();
    }

    public static void clearDataSourceKey() {
        dataSources.remove();
    }
}

class BussinessThread extends Thread {
    
    private Map<String, String> name2DataSource;
    private int number;
    BussinessThread(int number, Map<String, String> name2DataSource){
        this.number = number;
        this.name2DataSource = name2DataSource;
    }
    
    @Override
    public void run() {
        
        System.out.println("業務線程" + number + "準備進行讀操做");
        DataSourceHolder.setDataSourceKey("read");
        
        try {
            Thread.sleep((long)(Math.random()*1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("業務線程" + number + "讀操做對應的數據庫是:" + name2DataSource.get(DataSourceHolder.getDataSourceKey()));
        DataSourceHolder.clearDataSourceKey();
        
        // 再測試寫操做。
        try {
            Thread.sleep((long)(Math.random()*1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("業務線程" + number + "準備進行寫操做");
        DataSourceHolder.setDataSourceKey("write");
        
        try {
            Thread.sleep((long)(Math.random()*1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("業務線程" + number + "寫操做對應的數據庫是:" + name2DataSource.get(DataSourceHolder.getDataSourceKey()));
        DataSourceHolder.clearDataSourceKey();
    }
}

  ThreadLocal在使用完以後必定要手動threadlocal.remove()。緣由有二: 1,若是使用不當會形成內存泄漏。線程類Thread中都有一份ThreadLocalMap的變量用來存儲線程本地變量。因爲ThreadLocalMap的生命週期跟Thread同樣長,若是使用完以後沒有手動threadlocal.remove()刪除則會產生內存泄漏。 2,使用線程池的使用,線程是反覆利用的資源,回收前的線程的副本變量會可能對再次時形成影響。
  so,正確使用ThreadLocal的姿式要注意兩點: 1,ThreadLocal設置爲類的靜態變量。這樣就只維持一份。 2,set(T value)設置,get()使用以後,必定要記得remove()刪除。

參考資料:

  • 以上內容爲筆者平常瑣屑積累,已無從考究引用。若是有,請站內信提示。
相關文章
相關標籤/搜索