concurrent的工具包類主要用來協調不一樣線程的運行狀態(完成狀態、完成步調)、對同步資源的訪問限制。
java
countDownLatch是經過一個計數器來實現線程的統一執行。首先給latch設置一個初始值,countDown() 每被調用一次,設定的初始值會減1。在設定值達到0以前,使用latch. await() 的線程會一直阻塞。知道latch.countDown()將設定值減爲0,latch.wait()線程纔會運行。dom
public class cdlTest { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); Waiter waiter = new Waiter(latch); Decrementer decrementer = new Decrementer(latch); new Thread(waiter).start(); new Thread(decrementer).start(); } } class Waiter implements Runnable { CountDownLatch latch = null; public Waiter(CountDownLatch latch) { this.latch = latch; } @Override public void run() { System.out.println(" waiter is begin"); try { latch.wait(); } catch (Exception e) { e.printStackTrace(); } System.out.println(" Waiter Released"); } } class Decrementer implements Runnable { CountDownLatch latch = null; public Decrementer(CountDownLatch latch) { this.latch = latch; } @Override public void run() { System.out.println(" Decrementer is begin"); try { Thread.sleep(1000); this.latch.countDown(); System.out.println(" Decrementer down 1"); Thread.sleep(1000); this.latch.countDown(); System.out.println(" Decrementer down 2"); Thread.sleep(1000); this.latch.countDown(); System.out.println(" Decrementer down 3"); } catch (Exception e) { e.printStackTrace(); } System.out.println(" Decrementer is release "); } }
循環屏障:會將全部在這個鎖定隊列中的線程都按順序執行。當awite()等時候,線程進入等待隊列,而後等全部的線程(線程數是CyclicBarrier裏的參數)。
ide
public class CycliTest { private static int[] timeWalk = { 5, 8, 15, 15, 10 }; private static int[] timeSelf = { 1, 3, 4, 4, 5 }; private static int[] timeBus = { 2, 4, 6, 6, 7 }; static String now() { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); return sdf.format(new Date()) + ":"; } static class Tour implements Runnable { private int[] times; private CyclicBarrier barrier; private String tourName; public Tour(CyclicBarrier barrier, String tourName, int[] times) { this.times = times; this.barrier = barrier; this.tourName = tourName; } @Override public void run() { try { Thread.sleep(times[0] * 100); System.out.println(now() + tourName + " Reached Shenzhen "); barrier.await(); Thread.sleep(times[1] * 100); System.out.println(now() + tourName + " Reached Guangzhou "); barrier.await(); Thread.sleep(times[2] * 100); System.out.println(now() + tourName + " Reached Shaoguan "); barrier.await(); Thread.sleep(times[3] * 100); System.out.println(now() + tourName + " Reached Chengdu "); barrier.await(); Thread.sleep(times[4] * 100); System.out.println(now() + tourName + " Reached Wuhan"); barrier.await(); } catch (BrokenBarrierException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(3); ExecutorService exec = Executors.newCachedThreadPool(); exec.submit(new Tour(barrier, "WalkTour", timeWalk)); exec.submit(new Tour(barrier, "SelfTour", timeSelf)); exec.submit(new Tour(barrier, "BusTour", timeBus)); exec.shutdown(); } }
一個信號量。因爲咱們的鎖都是加在semaphore上的,只有等semaphore有空閒的時候,等待線程纔會繼續執行。對限制線程個數的操做的共享資源能夠使用。
工具
public class MyTest extends Thread { private Semaphore semaphore; public MyTest(Semaphore s) { this.semaphore = s; } public void run() { try { if (semaphore.availablePermits() > 0) { System.out.println("thread name [" + this.getName() + "] can take"); } else { System.out.println(" thread name [" + this.getName() + "] must waite"); } semaphore.acquire();//獲取 System.out.println("thread name[" + this.getName() + "] get resource"); Thread.sleep((long) (Math.random() * 1000)); System.out.println("thread name[" + this.getName() + "] release resource"); semaphore.release();//釋放 } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); Semaphore s = new Semaphore(2); for (int i = 0; i < 50; i++) { es.submit(new MyTest(s)); } es.shutdown(); } }