同步工具類:CountDownLatch、CyclicBarrier和Exchanger

1. CountDownLatch

1.1說明:

        CountDownLatch能夠理解爲一個計數器在初始化時設置初始值,當一個線程須要等待某些操做先完成時,須要調用await()方法。這個方法讓線程進入休眠狀態直到等待的全部線程都執行完成。每調用一次countDown()方法內部計數器減1,當計數達到0時,則全部的等待着開始執行。 能夠實現一個線程(也能夠是多個線程)等待其餘線程來喚醒,也能夠實現一個線程通知多個線程的效果。java

  •  舉例:相似裁判一聲口令下,全部的運動員才能開始奔跑,或者全部的運動員奔跑完後纔有比賽結果。
  • 核心方法兩個:countDown()和await()

  (1)countDown():使CountDownLatch維護的內部計數器減1,每一個被等待的線程完成的時候調用。安全

  (2)await():線程在執行到CountDownLatch的時候會將此線程置於休眠。併發

 2.2使用案例:

        模擬裁判發出命令後,3個運動員以不一樣的速度奔跑。等3個運動員都跑完後,裁判獲得比賽結果。dom

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * CountDownLatch demo :模擬裁判發出命令後,3個運動員以不一樣的速度奔跑。等3個運動員都跑完後,裁判獲得比賽結果
 * 
 * @author Smile
 */
public class CountdownLatchTest {

	public static void main(String[] args) throws InterruptedException {
		ExecutorService service = Executors.newCachedThreadPool();
		// 建立一個「命令」計數器
		final CountDownLatch cdOrder = new CountDownLatch(1);
		// 建立三個「迴應」計數器
		final CountDownLatch cdAnswer = new CountDownLatch(3);
		for (int i = 0; i < 3; i++) {
			Runnable runnable = new Runnable() {
				@Override
				public void run() {
					try {
						System.out.println("線程" + Thread.currentThread().getName() + "正在準備接收命令");
						// 等待「命令」計數器爲0
						cdOrder.await();

						System.out.println("線程" + Thread.currentThread().getName() + "已收命令");
						Thread.sleep((long) (Math.random() * 10000));
						System.out.println("線程" + Thread.currentThread().getName() + "處理結束!迴應命令處理結果");
						// 「迴應」計數器減1
						cdAnswer.countDown();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}

				}
			};
			service.execute(runnable);
		}

		// 主線程發命令CountDownLatch cdOrder命令
		Thread.sleep((long) (Math.random() * 10000));
		System.out.println("線程" + Thread.currentThread().getName() + "即將發佈命令");
		// 「命令」計數器減1,「命令」計數器爲0,開始喚醒cdOrder.await()的線程
		cdOrder.countDown();
		System.out.println("線程" + Thread.currentThread().getName() + "已發佈命令,等待處理結果……");

		// 等待「迴應」計數器爲0時,再繼續執行
		cdAnswer.await();
		System.out.println("線程" + Thread.currentThread().getName() + "已收到所有結果結果。");

		service.shutdown();
	}

}

運行結果:ide

2. CyclicBarrier

 2.1說明:

        CyclicBarrier是一個同步工具類,它容許一組線程互相等待,直到到達某個公共屏障點。工具

  • 舉例:整個公司的人利用週末時間到集體郊遊同樣,先各自從家裏出發到公司集合,再同時出發到公園分開遊玩,在餐館集合後再同時開始聚餐。這裏的「公司集合」、「餐館集合」就是指的是公共屏障點。
  • 經常使用方法:

     (1)await()方法:在調用await()方法後,CyclicBarrier將阻塞這個線程並將它置入休眠狀態等待其它線程的到來。spa

2.2 與CountDownLatch的區別:

    (1)與CountDownLatch不一樣的是該barrier在釋放等待線程後能夠重用,因此稱它爲循環(Cyclic)的屏障(Barrier)。線程

    (2)CountDownLatch主要是實現了1個或N個線程須要等待其餘線程完成某項操做以後才能繼續往下執行操做,描述的是1個線程或N個線程等待其餘線程的關係。CyclicBarrier主要是實現了多個線程之間相互等待,直到全部的線程都知足了條件以後各自才能繼續執行後續的操做,描述的多個線程內部相互等待的關係code

 2.3使用案例:

        模擬線程一、線程二、線程3以不一樣時間到達集合點1後,再一塊兒以不一樣的速度到達集合點2,等全部線程都到達集合點2後,再一塊兒出發到集合點3。對象

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * CyclicBarrier
 * Demo:模擬線程一、線程二、線程3以不一樣時間到達集合點1後,再一塊兒以不一樣的速度到達集合點2,等全部線程都到達集合點2後,再一塊兒出發到集合點3
 * 
 * @author Smile
 */
public class CyclicBarrierTest {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		// 設置屏障,此屏障須要等待3個線程
		final CyclicBarrier cb = new CyclicBarrier(3);
		for (int i = 0; i < 3; i++) {
			Runnable runable = new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep((long) (Math.random() * 10000));
						System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點1, 當前已有"
								+ (cb.getNumberWaiting() + 1) + "個併發"
								+ ((2 == cb.getNumberWaiting()) ? "。線程已經所有到達集合點1,繼續往下走!" : ""));
						cb.await(); // 設置第一次屏障,需等3個線程所有執行到此時,才繼續往下

						Thread.sleep((long) (Math.random() * 10000));
						System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點2, 當前已有"
								+ (cb.getNumberWaiting() + 1) + "個併發"
								+ ((2 == cb.getNumberWaiting()) ? "。線程已經所有到達集合點2,繼續往下走!" : ""));
						cb.await(); // 設置第二次屏障,需等3個線程所有執行到此時,才繼續往下

						Thread.sleep((long) (Math.random() * 10000));
						System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點3, 當前已有"
								+ (cb.getNumberWaiting() + 1) + "個併發"
								+ ((2 == cb.getNumberWaiting()) ? "。線程已經所有到達集合點3,完畢!" : ""));
						cb.await(); // 設置第三次屏障,需等3個線程所有執行到此時,才繼續往下

					} catch (InterruptedException e) {
						e.printStackTrace();
					} catch (BrokenBarrierException e) {
						e.printStackTrace();
					}
				}
			};
			service.execute(runable);
		}
		service.shutdown();
	}
}

運行結果:

3. Exchanger

3.1說明

        用於實現兩我的之間的數據交換,每一個人在完成必定的事務後想與對方交互數據,第一個先拿出數據的人將一直等待第二我的拿着數據到來時,才彼此交換數據。

  • 核心方法:

    public V exchange(V x):等待另外一個線程到達交換點(若是當前線程沒有被中斷),而後將已知的對象傳給它,返回接收的對象。

 3.2使用案例:

        模擬兩個線程交換數據:當線程A調用Exchange對象的exchange()方法後,他會陷入阻塞狀態,直到線程B也調用了exchange()方法,而後以線程安全的方式交換數據,以後線程A和B繼續運行。

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Exchanger Demo:兩個線程之間的信息交換
 * 用於實現兩我的之間的數據交換,每一個人在完成必定的事務後想與對方交互數據,第一個先拿出數據的人將一直等待第二我的拿着數據到來時,才彼此交換數據
 * 
 * @author xiao
 *
 */
public class ExchangerTest {

	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		final Exchanger exchanger = new Exchanger();
		service.execute(new Runnable() {
			@Override
			public void run() {
				try {
					String data1 = "XMSSS";
					System.out.println("線程" + Thread.currentThread().getName() + "正在把數據 " + data1 + "交換出去");
					Thread.sleep((long) (Math.random() * 10000));
					String data2 = (String) exchanger.exchange(data1);
					System.out.println("線程" + Thread.currentThread().getName() + "換回的數據爲 " + data2);

				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});

		service.execute(new Runnable() {
			@Override
			public void run() {
				try {
					String data1 = "GRSXX";
					System.out.println("線程" + Thread.currentThread().getName() + "正在把數據 " + data1 + "交換出去");
					Thread.sleep((long) (Math.random() * 10000));
					String data2 = (String) exchanger.exchange(data1);
					System.out.println("線程" + Thread.currentThread().getName() + "換回的數據爲 " + data2);

				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
	}
}

運行效果:

相關文章
相關標籤/搜索