Semaphore能夠維護當前訪問自身的線程個數,並提供了同步機制。使用Semaphore能夠控制同時訪問資源的線程個數,例如,實現一個文件容許的併發訪問數。java
Semaphore實現的功能就相似銀行有6個窗口,12我的有業務要操做,那麼同時只能有6我的佔用窗口,當有的人業務操做完畢以後,讓開位置,其它等待的人羣中,有一人能夠佔用當前窗口,操做本身的業務。併發
另外等待的5我的中能夠是隨機得到優先機會,也能夠是按照先來後到的順序得到機會,這取決於構造Semaphore對象時傳入的參數選項。dom
單個信號量的Semaphore對象能夠實現互斥鎖的功能,而且能夠是由一個線程得到了「鎖」,再由另外一個線程釋放「鎖」,這可應用於死鎖恢復的一些場合。ide
package com.chunjiangchao.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * 同步信號量的使用 * @author chunjaingchao * 隨時能夠調整Semaphore中可併發的數量 */ public class SemaphoreDemo { public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(1);//容許的併發數 for(int i=0;i<10;i++){ threadPool.execute(new Runnable(){ @Override public void run() { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"正在執行,當前已經有"+(1-semaphore.availablePermits())+"個併發"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(); } }); } } }
表示你們彼此等待,你們集合好後纔開始出發,分散活動後又在指定地點集合碰面,這就比如整個公司的人員利用週末時間集體郊遊同樣,先各自從家出發到公司集合後,再同時出發到公園遊玩,在指定地點集合後再同時開始就餐,…。工具
一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環 的 barrier。ui
package com.chunjiangchao.thread; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * CyclicBarrier的使用 * @author chunjiangchao * */ public class CyclicBarrierDemo { public static void main(String[] args) { final int threadNum = 5; ExecutorService threadPool = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(threadNum); for(int i=0;i<threadNum;i++){ threadPool.execute(new Runnable(){ @Override public void run() { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("線程" + Thread.currentThread().getName() + "運行到Barrier1,已有"+ (cb.getNumberWaiting() + 1)+ "個已經到達," + (cb.getNumberWaiting() == threadNum-1 ? "都到齊了,繼續走啊" : "正在等候")); cb.await();//障礙點1:當前線程在await這個障礙地點停頓,等着其它線程運行到這 Thread.sleep((long) (Math.random() * 10000)); System.out.println("線程" + Thread.currentThread().getName() + "運行到Barrier2,已有"+ (cb.getNumberWaiting() + 1)+ "個已經到達," + (cb.getNumberWaiting() == threadNum-1 ? "都到齊了,繼續走啊" : "正在等候")); cb.await();//障礙點2: Thread.sleep((long) (Math.random() * 10000)); System.out.println("線程" + Thread.currentThread().getName() + "運行到Barrier3,已有"+ (cb.getNumberWaiting() + 1)+ "個已經到達," + (cb.getNumberWaiting() == threadNum-1 ? "都到齊了,繼續走啊" : "正在等候")); cb.await();//障礙點3: } catch (Exception e) { e.printStackTrace(); } } }); } //正在執行的任務接着執行,後續不容許添加任務 threadPool.shutdown(); } }
一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。this
猶如倒計時計數器,調用CountDownLatch對象的countDown方法就將計數器減1,當計數到達0時,則全部等待者或單個等待者開始執行。spa
能夠實現一我的(也能夠是多我的)等待其餘全部人都來通知他,這猶如一個計劃須要多個領導都簽字後才能繼續向下實施。還能夠實現一我的通知多我的的效果,相似裁判一聲口令,運動員同時開始奔跑。用這個功能作百米賽跑的遊戲程序不錯哦!線程
package com.chunjiangchao.thread; import java.util.Random; import java.util.concurrent.CountDownLatch; /** * 建立這個CountDownLatch對象的時候,會傳入計數器個數,當前線程調用await方法進行等待其它線程的操做, * 當其餘線程操做計數器的值直到0的時候,纔會繼續執行後續操做 * @author chuangjiangchao * */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(3); for(int i=0;i<3;i++){ new WorkThread(countDownLatch).start(); } System.out.println("老大在這兒等着"); countDownLatch.await(); System.out.println("大家都跑完了,該我走人了"); } private static class WorkThread extends Thread{ private CountDownLatch countDownLatch; public WorkThread(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void run() { System.out.println("執行耗時的操做"); try { Thread.sleep(100*new Random().nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); System.out.println(Thread.currentThread().getName()+"跑完任務,計數器值改變"); } } }
用於實現兩個線程之間的數據交換,每一個線程在完成必定的事務後想與對方交換數據,第一個先拿出數據的線程將一直等待第二個線程拿着數據到來時,才能彼此交換數據。code
(誰先到達,誰就等待另一個線程到達,而後開始交換數據。最後執行各自的動做。)
package com.chunjiangchao.thread; import java.util.Date; import java.util.Random; import java.util.concurrent.Exchanger; /** * 兩個線程之間的數據交換 * @author chunjiangchao * */ public class ExchangerDemo { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<String>();//經過這玩意兒來交換數據 // ExecutorService threadPool = Executors.newCachedThreadPool(); // threadPool.execute(command); new ThreadA(exchanger).start(); new ThreadB(exchanger).start(); } private static class ThreadA extends Thread{ private Exchanger<String> exchanger; public ThreadA(Exchanger<String> exchanger){ this.exchanger = exchanger; } @Override public void run() { try { System.out.println(Thread.currentThread().getName()+"執行以前"+new Date().toLocaleString()); Thread.sleep(new Random().nextInt(100)*100); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println(Thread.currentThread().getName()+"開始交換數據"+new Date().toLocaleString()); String exchange = exchanger.exchange("A的數據");//給我等着,直到須要交換數據的線程B到來 System.out.println(Thread.currentThread().getName()+"交換後獲得的數據:"+exchange); } catch (InterruptedException e) { e.printStackTrace(); } } } private static class ThreadB extends Thread{ private Exchanger<String> exchanger; public ThreadB(Exchanger<String> exchanger){ this.exchanger = exchanger; } @Override public void run() { try { System.out.println(Thread.currentThread().getName()+"執行以前"+new Date().toLocaleString()); Thread.sleep(new Random().nextInt(100)*100); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println(Thread.currentThread().getName()+"開始交換數據"+new Date().toLocaleString()); String exchange = exchanger.exchange("B的數據"); System.out.println(Thread.currentThread().getName()+"交換後獲得的數據:"+exchange); } catch (InterruptedException e) { e.printStackTrace(); } } } }
阻塞隊列與Semaphore有些類似,但也不一樣,阻塞隊列是一方存放數據,另外一方釋放數據,Semaphore一般則是由同一方設置和釋放信號量(主要控制訪問資源的線程數)。
ArrayBlockingQueue :只有put方法和take方法才具備阻塞功能。
package com.chunjiangchao.thread; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; /** * 兩個生產者,一個消費者 * @author chunjiangchao * */ public class ArrayBlockingQueueDemo { public static void main(String[] args) { final ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3); for(int i=0;i<2;i++){ new Thread(new Runnable() { @Override public void run() { while(true){ try { Thread.sleep(3000); int nextInt = new Random().nextInt(100); System.out.println(Thread.currentThread().getName()+"添加數據爲:"+nextInt); queue.put(nextInt);//當前操做與take操做是阻塞的 System.out.println("當前數據個數"+queue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } new Thread(new Runnable() { @Override public void run() { while(true){ try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName()+"獲取數據"); Integer take = queue.take();//取數據,若是queue裏面沒有數據,就會一直等,等queue裏面存放數據了,纔會執行後續的代碼 System.out.println("獲取到的數據爲"+take+" ,當前數據個數"+queue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
問題:用兩個具備1個空間的隊列來實現同步通知的功能
package com.chunjiangchao.thread; import java.util.concurrent.ArrayBlockingQueue; /** * 使用BlockingQueue實現同步通知的功能 * @author zzb * */ public class BlockingQueueCommunicationDemo { public static void main(String[] args) { final Business business = new Business(); Thread threadMain = new Thread(new Runnable(){ @Override public void run() { while(true){ try { business.main(); } catch (InterruptedException e) { e.printStackTrace(); } } } }); threadMain.setName("threadMain"); threadMain.start(); Thread threadSub = new Thread(new Runnable(){ @Override public void run() { while(true){ try { business.sub(); } catch (InterruptedException e) { e.printStackTrace(); } } } }); threadSub.setName("threadSub"); threadSub.start(); /* * 你一下我一下,你一下,我一下 threadMain執行耗時操做 threadSub執行耗時操做 threadMain執行耗時操做 threadSub執行耗時操做 threadMain執行耗時操做 threadSub執行耗時操做 */ } private static class Business{ private static ArrayBlockingQueue<Integer> mainQueue = new ArrayBlockingQueue<>(1); private static ArrayBlockingQueue<Integer> subQueue = new ArrayBlockingQueue<>(1); static{ try { mainQueue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } public void sub() throws InterruptedException{ subQueue.take();//sub在subQueue取出數據,取不到數據,就等着 System.out.println(Thread.currentThread().getName()+"執行耗時操做"); Thread.sleep(3000); mainQueue.put(1);//存放數據到subQueue,方便 } public void main() throws InterruptedException{ mainQueue.take();//main在mainQueue取出數據,取不到數據,就等着 System.out.println(Thread.currentThread().getName()+"執行耗時操做"); Thread.sleep(3000); subQueue.put(1);//存放數據到subQueue,方便 } } }
未完待續……