閉鎖容許一個或者多個線程等待其餘線程都完成了才繼續執行。CountDownLatch 是一種閉鎖的實現,使得一個或多個線程等待一組事情發生。經過計數器表示須要等待的事件數量;使用countDown()
方法將計數器減去1,表示有一個事件發生;使用await()
方法阻塞當前線程,等待計數器爲0,也就是全部須要等待的事情發生。java
CountDownLatch 不能從新初始化或者修改內部計數器的值。編程
CountDownLatch 內部依賴內部類 Sync 實現,而 Sync 類繼承於 AQS。其內部經過共享鎖實現。bash
示例:併發
public class CountDownLatchTest {
private static CountDownLatch countDownLatch = new CountDownLatch(5);
/** * Boss線程,等待員工到達開會 */
static class BossThread extends Thread{
@Override
public void run() {
System.out.println("Boss在會議室等待,總共有" + countDownLatch.getCount() + "我的開會...");
try {
//Boss等待
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("全部人都已經到齊了,開會吧...");
}
}
//員工到達會議室
static class EmpleoyeeThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ",到達會議室....");
//員工到達會議室 count - 1
countDownLatch.countDown();
}
}
public static void main(String[] args){
//Boss線程啓動
new BossThread().start();
for(int i = 0 ; i < countDownLatch.getCount() ; i++){
new EmpleoyeeThread().start();
}
}
}
複製代碼
運行結果:dom
Boss在會議室等待,總共有5我的開會...
Thread-2,到達會議室....
Thread-3,到達會議室....
Thread-1,到達會議室....
Thread-5,到達會議室....
Thread-4,到達會議室....
全部人都已經到齊了,開會吧...
複製代碼
閉鎖是一次性對象,一旦進入終止狀態,就不能被重置。屏障相似於閉鎖,阻塞一組進程直到某個事件發生。也就是讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。ide
CycliBarrier 內部使用了可重用鎖 ReentrantLock 和 Condition。線程執行await()
方法後,計數器減1,進行等待,直到計數器爲0,全部調用了await()
方法的線程繼續執行。工具
CycliBarrier適用於多個線程結果合併的場景。ui
示例:this
public class CyclicBarrierTest {
private static CyclicBarrier cyclicBarrier;
static class CyclicBarrierThread extends Thread{
public void run() {
System.out.println(Thread.currentThread().getName() + "到了");
//等待
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args){
cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("人到齊了,開會吧....");
}
});
for(int i = 0 ; i < 5 ; i++){
new CyclicBarrierThread().start();
}
}
}
複製代碼
運行結果:spa
Thread-0到了
Thread-1到了
Thread-4到了
Thread-2到了
Thread-3到了
人到齊了,開會吧...
複製代碼
CountDownLatch 的計數器只能使用一次,CyclicBarrier 的計數器可使用 reset() 方法重置。
閉鎖只會阻塞一條線程,目的是爲了讓該條任務線程知足條件後執行; 而同步屏障會阻塞全部線程,目的是爲了讓全部線程同時執行。
信號量用於控制同時訪問某個特定資源的操做數量,或者執行某個指定操做的數量。計數信號量還能夠用來實現某種資源池,或者對容器施加邊界。
信號量能夠用於實現資源池,也能夠用於將容器變爲有界阻塞容器。信號量管理着一組虛擬的許可,在執行操做時首先獲取許可,並在使用之後釋放許可。若是沒有許可,將阻塞直到有許可或被中斷,超時。
信號量的使用場景是,有m個資源,n個線程,且n>m,同一時刻只能容許m條線程訪問資源。
信號量內部一樣依賴於繼承自 AQS 的 Sync 類,且包含兩個子類,分別是 FairSync 和 NonfairSync。
信號量經過acquire()
方法獲取許可;經過release()
方法釋放許可。
示例:
public class SemaphoreCase {
private static class Parking{
private Semaphore semaphore;
Parking(int count){
semaphore=new Semaphore(count);
}
public void park(){
try {
semaphore.acquire();
long time=(long) (Math.random()*10);
System.out.println(Thread.currentThread().getName()+"進入停車場停車"+time+"秒");
Thread.sleep(time);
System.out.println(Thread.currentThread().getName()+"開出停車場");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
private static class Car extends Thread{
Parking parking;
Car(Parking parking){
this.parking=parking;
}
@Override
public void run() {
parking.park();
}
}
public static void main(String[] args) {
Parking parking=new Parking(3);
for(int i=0;i<5;i++){
new Car(parking).start();
}
}
}
複製代碼
運行結果爲:
Thread-2進入停車場停車7秒
Thread-1進入停車場停車9秒
Thread-0進入停車場停車4秒
Thread-0開出停車場
Thread-3進入停車場停車4秒
Thread-2開出停車場
Thread-4進入停車場停車3秒
Thread-1開出停車場
Thread-3開出停車場
Thread-4開出停車場
複製代碼
Exchanger 是一個用於線程間協做的工具類,用於線程間的數據交換。它提供了一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。兩個線程經過exchange()
方法交換數據,一個線程執行了該方法,會一直等待另外一個線程執行該方法,當兩個線程都到達同步點,這兩個線程就能夠交換數據。
示例:
public class ExchangerCase {
private static class Producer implements Runnable{
private List<String> buffer;
private Exchanger<List<String>> exchanger;
Producer(List<String> buffer,Exchanger<List<String>> exchanger){
this.buffer=buffer;
this.exchanger=exchanger;
}
@Override
public void run() {
for(int i=1;i<5;i++){
System.out.println("生產者生產次數:"+i);
for(int j=1;j<=3;j++){
System.out.println("生產者裝入"+i+"--"+j);
buffer.add("buffer: "+i+"--"+j);
}
System.out.println("生產者裝滿,等待消費者交換");
try {
exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class Consumer implements Runnable{
private List<String> buffer;
private Exchanger<List<String>> exchanger;
Consumer(List<String> buffer,Exchanger<List<String>> exchanger){
this.buffer=buffer;
this.exchanger=exchanger;
}
@Override
public void run() {
for(int i=1;i<5;i++){
try {
buffer=exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者消費次數:"+i);
for(int j=1;j<=3;j++) {
System.out.println("消費者消費" +buffer.get(0));
buffer.remove(0);
}
}
}
}
public static void main(String[] args) {
List<String> buffer1 = new ArrayList<String>();
List<String> buffer2 = new ArrayList<String>();
Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
Thread producerThread = new Thread(new Producer(buffer1,exchanger));
Thread consumerThread = new Thread(new Consumer(buffer2,exchanger));
producerThread.start();
consumerThread.start();
}
}
複製代碼
運行結果
生產者生產次數:1
生產者裝入1--1
生產者裝入1--2
生產者裝入1--3
生產者裝滿,等待消費者交換
生產者生產次數:2
生產者裝入2--1
生產者裝入2--2
生產者裝入2--3
生產者裝滿,等待消費者交換
消費者消費次數:1
消費者消費buffer: 1--1
消費者消費buffer: 1--2
消費者消費buffer: 1--3
消費者消費次數:2
消費者消費buffer: 2--1
消費者消費buffer: 2--2
生產者生產次數:3
生產者裝入3--1
消費者消費buffer: 2--3
生產者裝入3--2
生產者裝入3--3
生產者裝滿,等待消費者交換
生產者生產次數:4
生產者裝入4--1
生產者裝入4--2
消費者消費次數:3
消費者消費buffer: 3--1
生產者裝入4--3
生產者裝滿,等待消費者交換
消費者消費buffer: 3--2
消費者消費buffer: 3--3
消費者消費次數:4
消費者消費buffer: 4--1
消費者消費buffer: 4--2
消費者消費buffer: 4--3
複製代碼
在Exchanger中,若是一個線程已經到達了exchanger節點時,對於它的夥伴節點的狀況有三種:
參考資料