java的concurrent包工具類

java的concurrent包工具類

        concurrent的工具包類主要用來協調不一樣線程的運行狀態(完成狀態、完成步調)、對同步資源的訪問限制。
java

一、CountDownLatch

        countDownLatch是經過一個計數器來實現線程的統一執行。首先給latch設置一個初始值,countDown() 每被調用一次,設定的初始值會減1。在設定值達到0以前,使用latch. await() 的線程會一直阻塞。知道latch.countDown()將設定值減爲0,latch.wait()線程纔會運行。dom

public class cdlTest {

	public static void main(String[] args) throws InterruptedException {
		CountDownLatch latch = new CountDownLatch(3);
		Waiter waiter = new Waiter(latch);
		Decrementer decrementer = new Decrementer(latch);

		new Thread(waiter).start();
		new Thread(decrementer).start();
	}

}

class Waiter implements Runnable {
	CountDownLatch latch = null;

	public Waiter(CountDownLatch latch) {
		this.latch = latch;
	}

	@Override
	public void run() {
		System.out.println(" waiter is begin");
		try {
			latch.wait();
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println(" Waiter Released");
	}

}

class Decrementer implements Runnable {
	CountDownLatch latch = null;

	public Decrementer(CountDownLatch latch) {
		this.latch = latch;
	}

	@Override
	public void run() {
		System.out.println(" Decrementer is begin");
		try {
			Thread.sleep(1000);
			this.latch.countDown();
			System.out.println(" Decrementer down 1");
			Thread.sleep(1000);
			this.latch.countDown();
			System.out.println(" Decrementer down 2");
			Thread.sleep(1000);
			this.latch.countDown();
			System.out.println(" Decrementer down 3");
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println(" Decrementer is release ");
	}

}

二、Cyclicbarrier

        循環屏障:會將全部在這個鎖定隊列中的線程都按順序執行。當awite()等時候,線程進入等待隊列,而後等全部的線程(線程數是CyclicBarrier裏的參數)。
ide

public class CycliTest {

	private static int[] timeWalk = { 5, 8, 15, 15, 10 };
	private static int[] timeSelf = { 1, 3, 4, 4, 5 };
	private static int[] timeBus = { 2, 4, 6, 6, 7 };

	static String now() {
		SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
		return sdf.format(new Date()) + ":";
	}

	static class Tour implements Runnable {

		private int[] times;
		private CyclicBarrier barrier;
		private String tourName;

		public Tour(CyclicBarrier barrier, String tourName, int[] times) {
			this.times = times;
			this.barrier = barrier;
			this.tourName = tourName;
		}

		@Override
		public void run() {
			try {
				Thread.sleep(times[0] * 100);
				System.out.println(now() + tourName + " Reached Shenzhen ");
				barrier.await();
				Thread.sleep(times[1] * 100);
				System.out.println(now() + tourName + " Reached Guangzhou ");
				barrier.await();
				Thread.sleep(times[2] * 100);
				System.out.println(now() + tourName + " Reached Shaoguan ");
				barrier.await();
				Thread.sleep(times[3] * 100);
				System.out.println(now() + tourName + " Reached Chengdu ");
				barrier.await();
				Thread.sleep(times[4] * 100);
				System.out.println(now() + tourName + " Reached Wuhan");
				barrier.await();
			} catch (BrokenBarrierException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

		}

	}

	public static void main(String[] args) {
		CyclicBarrier barrier = new CyclicBarrier(3);
		ExecutorService exec = Executors.newCachedThreadPool();
		exec.submit(new Tour(barrier, "WalkTour", timeWalk));
		exec.submit(new Tour(barrier, "SelfTour", timeSelf));
		exec.submit(new Tour(barrier, "BusTour", timeBus));
		exec.shutdown();
	}
}

三、Semaphore

        一個信號量。因爲咱們的鎖都是加在semaphore上的,只有等semaphore有空閒的時候,等待線程纔會繼續執行。對限制線程個數的操做的共享資源能夠使用。
工具

public class MyTest extends Thread {

	private Semaphore semaphore;

	public MyTest(Semaphore s) {
		this.semaphore = s;
	}

	public void run() {
		try {
			if (semaphore.availablePermits() > 0) {
				System.out.println("thread name [" + this.getName() + "] can take");
			} else {
				System.out.println(" thread name [" + this.getName() + "] must waite");
			}
			semaphore.acquire();//獲取
			System.out.println("thread name[" + this.getName() + "] get resource");
			Thread.sleep((long) (Math.random() * 1000));
			System.out.println("thread name[" + this.getName() + "] release resource");
			semaphore.release();//釋放
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		ExecutorService es = Executors.newCachedThreadPool();
		Semaphore s = new Semaphore(2);
		for (int i = 0; i < 50; i++) {
			es.submit(new MyTest(s));
		}
		es.shutdown();
	}

}
相關文章
相關標籤/搜索