J.U.C併發包提供了幾個很是有用的、用於併發流程控制的CountDownLatch、CyclicBarrier、Semaphore、類等。java
1,CountDownLatch,閉鎖。實現相似計數器的功能。CountDownLatch的經常使用API以下:數據庫
CountDownLatch(int count) // 構造方法,接受一個int類型參數表示總計數 void await() throws InterruptedException // 阻塞當前線程,直到計數=0,或者線程被中斷 boolean await(long timeout, TimeUnit unit) throws InterruptedException // 一段時間內阻塞當前線程。若是計數=0被喚醒則返回true,若是超時被喚醒則返回false void countDown() // 將計數-1 long getCount() // 獲取當前計數值,經常使用於debug
舉個例子。10個運動員比賽百米賽跑,只有這10個運動員都準備好以後,發令員才能發槍。緩存
public class MyTest4CountDownLatch { public static void main(String[] args) { int playerNumbs = 10; // 運動員數量 CountDownLatch cdl = new CountDownLatch(playerNumbs); // 建立一個CountDownLatch對象(cdl的初始計數=10) Thread starter = new Starter(cdl); // 建立一個發令員 List<Thread> players = new ArrayList<>(playerNumbs); // 建立10個運動員(放到一個list集合中方便操做) for (int i = 0; i < playerNumbs; i++) { Thread player = new Player(i, cdl); players.add(player); } starter.start(); for (int i = 0; i < playerNumbs; i++) { players.get(i).start(); } } } class Player extends Thread{ private CountDownLatch cdl; private int number; Player(int number, CountDownLatch cdl){ this.cdl = cdl; this.number = number; } @Override public void run() { try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("運動員" + number + "準備好了."); cdl.countDown(); // 該運動員準備就緒(cdl的計數-1) } } class Starter extends Thread { private CountDownLatch cdl; Starter(CountDownLatch cdl){ this.cdl = cdl; } @Override public void run() { System.out.println("發令員舉起手槍,等待全部運動員準備就緒."); try { cdl.await(); // 等待全部運動員準備就緒(等待cdl的計數=0) } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("全部運動員已準備就緒,發令員發槍。"); } }
2,CyclicBarrier,循環柵欄。等待線程的數量達到目標的時候,全部等待的線程同時執行。可重置。CyclicBarrier的經常使用API以下:安全
CyclicBarrier(int parties) // 構造方法,指明須要等待的計數 CyclicBarrier(int parties, Runnable barrierAction) // 構造方法,指明須要等待的計數 和 計數=0時的觸發操做 int getParties() // 返回初始化指定的parties int await() throws InterruptedException, BrokenBarrierException // 阻塞當前線程。返回剩餘等待計數。若是返回paties-1則表示是第1個到達。若是返回0則表示最後一個到達 int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException // 一段時間內阻塞等待。響應線程中斷標識。返回值同await() boolean isBroken() // 若是當前cyclicBarrier對象處於broken狀態則返回true void reset() // 重置計數 int getNumberWaiting() // 返回剩餘計數
CyclicBarrier是一個全部線程要麼全經過,要麼全不經過的工具。若是有線程調用await(long timeout, TimeUnit unit) 超時經過,則CyclicBarrier處於broken狀態,其餘別的正在等待的線程會收到InterruptedException, 後續調用await()的線程會收到BrokenBarrierException。多線程
舉個例子。地鐵的修建是按分段來修建的,只有全部的分段都施工完成以後,才能夠通車。併發
public class MyTest4CyclicBarrier { public static void main(String[] args) throws InterruptedException { int lineSegementNumbs = 3; CyclicBarrier cb = new CyclicBarrier(lineSegementNumbs); // 建立CyclicBarrier對象 (cb計數=3) for (int i = 0; i < lineSegementNumbs; i++) { new LineSegment(i, cb).start(); } Thread.sleep(1000); System.out.println("CyclicBarrier重用"); // 自動重置計數 for (int i = 0; i < lineSegementNumbs; i++) { new LineSegment(i, cb).start(); } } } class LineSegment extends Thread{ private int segementNum; private CyclicBarrier cb; LineSegment(int segementNum, CyclicBarrier cb){ this.segementNum = segementNum; this.cb = cb; } @Override public void run() { try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線路段" + segementNum +"已施工完成."); try { cb.await(); // cb計數-1,等待cb計數=0 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("通車了..."); } }
3,Semaphore,信號量。一般用來表示許可數量,用於限制能夠訪問某些資源(物理或邏輯的)的線程數目。Semaphore的經常使用API以下:dom
Semaphore(int permits) // 構造方法,指定許可數量,默認非公平 Semaphore(int permits, boolean fair) // 構造方法,指定許可數量,制定是否公平獲取 void acquire() throws InterruptedException // 阻塞等待許可。阻塞期間響應線程的中斷標識 void acquireUninterruptibly() // 阻塞等待許可,不相應線程中斷標識。 boolean tryAcquire() // 嘗試獲取許可。若是失敗則返回false,若是獲取成功則返回true boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException // 一段時間內阻塞等待許可,阻塞期間響應線程的中斷標識 void release() // 釋放一個許可 // 如下API中,int類型的permits參數表示一次獲取或釋放多個許可 void acquire(int permits) throws InterruptedException // 同 acquire() void acquireUninterruptibly(int permits) // 同 acquireUninterruptibly() boolean tryAcquire(int permits) // 同 tryAcquire() boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException // 同 tryAcquire(long timeout, TimeUnit unit) void release(int permits) // 同 release() // 如下API經常使用於控制、監控或debug int availablePermits() // 剩餘可用的許可數量 int drainPermits() // 將全部剩餘許可置爲0 int getQueueLength() // 等待獲取許可的線程數量
舉個例子。橋每次最多能經過兩我的,每一個人經過橋的時間爲10秒,橋東西兩側各有10我的同時等待準備過橋。輸出每次過橋人的姓名、過橋方向和過橋時間。ide
public class MyTest4Semaphore { public static void main(String[] args) throws InterruptedException { String[] gruopEast = {"張三0", "張三1", "張三2", "張三3", "張三4", "張三5", "張三6", "張三7", "張三8", "張三9"}; String[] gruopWest = {"李四0", "李四1", "李四2", "李四3", "李四4", "李四5", "李四6", "李四7", "李四8", "李四9"}; int takeTime = 10; CyclicBarrier cb = new CyclicBarrier(20); // 使用CyclicBarrier來達到「同時準備過橋」的目的。 Semaphore semaphore = new Semaphore(2); for (int i = 0; i < gruopEast.length; i++) { new Player(gruopEast[i], "西", takeTime, cb, semaphore).start(); } for (int i = 0; i < gruopWest.length; i++) { new Player(gruopWest[i], "東", takeTime, cb, semaphore).start(); } } } class Player extends Thread { private String name; private String destination; private int takeTimeSeconds; private CyclicBarrier cb; private Semaphore semaphore; public Player(String name, String destination, int takeTimeSeconds, CyclicBarrier cb, Semaphore semaphore) { super(); this.name = name; this.destination = destination; this.takeTimeSeconds = takeTimeSeconds; this.cb = cb; this.semaphore = semaphore; } @Override public void run(){ try { cb.await(); semaphore.acquire(); // 阻塞獲取許可 System.out.println(name + "準備向" + destination + "過橋,須要花費" + takeTimeSeconds + "秒"); Thread.sleep(1000 * takeTimeSeconds); semaphore.release(); // 釋放許可 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
4,Phaser,JDK1.7新增,功能上比CyclicBarrier、CountDownLatch更強大,提供更爲豐富的API。經常使用於多線程參與多階段完成的場景。在不一樣階段,能夠等待不一樣數量人的人完成,再進入下一階段。在進入下一個階段的時候,用戶還能夠重寫onAdvance()來實現更佳自定義更加靈活的場景。Phaser的經常使用API以下:工具
Phaser(int parties) // 構造方法,指明有多少個線程參與 int register() // 當前線程註冊到當前階段 int arrive() // 當前線程已完成本身的工做 int arriveAndDeregister() // 當前線程已完成本身的工做,並取消註冊(不參與下一個階段)。 int arriveAndAwaitAdvance() // 當前線程已完成本身的工做,等待進入一下個階段(參與下一個階段) int awaitAdvance(int phase) // 阻塞等待第phase個階段 int awaitAdvanceInterruptibly(int phase) // 阻塞等待第phase個階段,阻塞期間響應線程中斷標識 int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) // 阻塞等待第phase個階段一段時間,阻塞期間響應線程中斷標識 void forceTermination() // 強制將該phaser對象設置爲中指狀態 int getPhase() // 獲取當前第phase個階段 int getRegisteredParties() // 獲取當前階段的註冊數量 int getArrivedParties() // 獲取當前階段已經完成的線程數量 int getUnarrivedParties() // 獲取當前階段還未完成的線程數量 boolean isTerminated() // 判斷該phaser是否可用 boolean onAdvance(int phase, int registeredParties) // protected方法,當一個階段完成後觸發,進入下一個階段以前的動做。
每一個Phaser實例都會維護一個phase number,初始值爲0。每當全部註冊的任務都到達Phaser時,phase number累加,並在超過Integer.MAX_VALUE後清零。arrive()和arriveAndDeregister()方法用於記錄到達;其中arrive(),某個參與者完成任務後調用;arriveAndDeregister(),任務完成,取消本身的註冊。arriveAndAwaitAdvance(),本身完成等待其餘參與者完成,進入阻塞,直到Phaser成功進入下個階段。測試
舉個例子。有一個項目,分爲4個階段完成。第一個階段有工人A,工人B,工人C共同參與。第二階段由工人C、工人D共同完成,第三階段由工人B、工人E共同完成,第四階段由工人A、B、C共同完成。前一個階段完成以後,才能進入下一個階段的工做。每一個階段完成以後,先向經理彙報,而後進入下一階段。
public class MyTest4Phaser { public static void main(String[] args) { Phaser phaser = new MyProject(5); int[] workerAPhaseArray = new int[] {0, 3}; int[] workerBPhaseArray = new int[] {0, 2, 3}; int[] workerCPhaseArray = new int[] {0, 1, 3}; int[] workerDPhaseArray = new int[] {1}; int[] workerEPhaseArray = new int[] {2}; int phaseAmount = 4; Worker workerA = new Worker("A", workerAPhaseArray, phaser, phaseAmount); Worker workerB = new Worker("B", workerBPhaseArray, phaser, phaseAmount); Worker workerC = new Worker("C", workerCPhaseArray, phaser, phaseAmount); Worker workerD = new Worker("D", workerDPhaseArray, phaser, phaseAmount); Worker workerE = new Worker("E", workerEPhaseArray, phaser, phaseAmount); workerA.start(); workerB.start(); workerC.start(); workerD.start(); workerE.start(); } } class MyProject extends Phaser { public MyProject(int parties){ super(parties); } @Override public boolean onAdvance(int currenPphase, int registeredParties) { if (currenPphase < 3) { System.out.println("通知經理:已經完成了第" + currenPphase + "階段任務,準備執行第" + (currenPphase + 1) + "階段任務。"); }else { System.out.println("通知經理:已完成了全部任務"); } return registeredParties == 0; } } class Worker extends Thread { protected String name; protected Phaser phaser; private int[] phaseArray; private int phaseAmount; public Worker (String name,int[] phaseArray, Phaser phaser, int phaseAmount){ super(name); this.name = name; this.phaseArray = phaseArray; this.phaser = phaser; this.phaseAmount = phaseAmount; if (phaseArray == null || phaseArray.length == 0) throw new RuntimeException("工人蔘與的階段錯誤"); } public void doWork(){ Set<Integer> set = intArray2Set(phaseArray); int lastPhase = phaseArray[phaseArray.length-1]; for (int i = 0; i < phaseAmount && (!phaser.isTerminated()); i++) { int currentPhase = phaser.getPhase(); if (set.contains(currentPhase)) { outPrint(currentPhase); if (lastPhase == currentPhase) { phaser.arriveAndDeregister(); break; }else { phaser.arriveAndAwaitAdvance(); } }else{ phaser.arriveAndAwaitAdvance(); } } } public Set<Integer> intArray2Set(int[] phaseArray){ return Arrays.stream(phaseArray).boxed().collect(Collectors.toSet()); } protected void outPrint(int i){ try { System.out.println("工人" + name + "正在執行第" + i + "階段任務"); Thread.sleep(1000); System.out.println("工人" + name + "執行完成第" + i + "階段任務"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void run() { doWork(); } }
5,Exchanger<V>,線程間的數據交換器。兩個線程在一個安全點彼此交換數據。該類比較簡單,就3個API:
public Exchanger() // V exchange(V x) throws InterruptedException // V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException //
第一個調用exchange()的線程會阻塞等待,直到第二個線程調用exchange()來完成彼此數據的交換。
舉個例子。飛機駕駛員有主飛和副飛,重要消息須要兩者互相確認的。
public class MyTest4Exchanger { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); Thread primary = new Pilot("主飛", "10分鐘後降落", exchanger); Thread secondary = new Pilot("副飛", "10分鐘後降落", exchanger); primary.start(); secondary.start(); } } class Pilot extends Thread { private String pilotName; private String receivedMsg; private Exchanger<String> exchanger; Pilot(String pilotName, String receivedMsg, Exchanger<String> exchanger){ this.pilotName = pilotName; this.receivedMsg = receivedMsg; this.exchanger = exchanger; } @Override public void run() { String ownReceivedMsg = receivedMsg; String otherReceivedMsg = null; try { otherReceivedMsg = exchanger.exchange(ownReceivedMsg); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(pilotName + "彙報:" + (ownReceivedMsg.equals(otherReceivedMsg) ? "消息一致." : "不消息一致.")); } }
以上介紹的5個工具類均爲線程間的互相通信的工具類。還有一個線程私有的工具類,ThreadLocal。不過,ThreadLocal是存在於java.lang包下的。
6,ThreadLocal<T>,本地線程變量。ThreadLocal爲每一個使用該變量的線程提供獨立的變量副本,因此每個線程均可以獨立地改變本身的副本,而不會影響其它線程所對應的副本。ThreadLocal的API也比較簡單:
ThreadLocal() // 構造方法 T get() // 獲取當前線程中存儲的本地變量 void set(T value) // 將value設置到當前線程的本地變量中存儲 void remove() // 刪除當前線程中存儲的本地變量
舉個例子。在多數據源處理中,以讀寫分離爲例,能夠將數據源的標識放到ThreadLocal中,使用aop來自動完成切換工做。本例就簡單的模擬一下。(例子中不採用AOP了,直接代碼中體現)
public class MyTest4ThreadLocal { public static void main(String[] args) { Map<String, String> name2DataSource = new HashMap<>(); // 緩存每一個datasource,value以String代替 name2DataSource.put("read", "ReadDataBase"); name2DataSource.put("write", "WriteDataBase"); for(int i = 0; i < 10; i++){ new BussinessThread(i, name2DataSource).start(); } } } class DataSourceHolder { private static final ThreadLocal<String> dataSources = new ThreadLocal<>(); public static void setDataSourceKey(String customType) { dataSources.set(customType); } public static String getDataSourceKey() { return (String) dataSources.get(); } public static void clearDataSourceKey() { dataSources.remove(); } } class BussinessThread extends Thread { private Map<String, String> name2DataSource; private int number; BussinessThread(int number, Map<String, String> name2DataSource){ this.number = number; this.name2DataSource = name2DataSource; } @Override public void run() { System.out.println("業務線程" + number + "準備進行讀操做"); DataSourceHolder.setDataSourceKey("read"); try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("業務線程" + number + "讀操做對應的數據庫是:" + name2DataSource.get(DataSourceHolder.getDataSourceKey())); DataSourceHolder.clearDataSourceKey(); // 再測試寫操做。 try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("業務線程" + number + "準備進行寫操做"); DataSourceHolder.setDataSourceKey("write"); try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("業務線程" + number + "寫操做對應的數據庫是:" + name2DataSource.get(DataSourceHolder.getDataSourceKey())); DataSourceHolder.clearDataSourceKey(); } }
ThreadLocal在使用完以後必定要手動threadlocal.remove()。緣由有二: 1,若是使用不當會形成內存泄漏。線程類Thread中都有一份ThreadLocalMap的變量用來存儲線程本地變量。因爲ThreadLocalMap的生命週期跟Thread同樣長,若是使用完以後沒有手動threadlocal.remove()刪除則會產生內存泄漏。 2,使用線程池的使用,線程是反覆利用的資源,回收前的線程的副本變量會可能對再次時形成影響。
so,正確使用ThreadLocal的姿式要注意兩點: 1,ThreadLocal設置爲類的靜態變量。這樣就只維持一份。 2,set(T value)設置,get()使用以後,必定要記得remove()刪除。
參考資料: