上次面試中問到AQS簡直不要太痛苦,全是問的源碼。可是源碼有時間仍是要看看的,畢竟對於提高咱們的寫代碼的能力仍是有幫助的。今天的面試緊接上回的AQS,內容是基於AQS實現的四大併發工具類: CyclicBarrier,CountDownLatch,Semaphore和Exchanger,簡要分析實現原理,着重講述如何使用。java
private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); //parties變量表示攔截線程的總數量,count變量表示攔截線程的剩餘須要數量 private final int parties; //barrierCommand變量爲CyclicBarrier接收的Runnable命令,用於在線程到達屏障時,優先執行barrierCommand,用於處理更加複雜的業務場景。 private final Runnable barrierCommand; //generation變量表示CyclicBarrier的更新換代 private Generation generation = new Generation(); 複製代碼
能夠看出CyclicBarrier內部是使用重入鎖和Condition的。它有兩個構造函數:mysql
/**
建立一個新的CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,並在啓動barrier時執行給定的屏障操做,該操做由最後一個進入barrier的線程執行。 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /** 建立一個新的CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,但它不會在啓動barrier時執行預約義的操做。 */ public CyclicBarrier(int parties) { this(parties, null); } 複製代碼
public int await(long timeout, TimeUnit unit)
throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { //獲取鎖 final ReentrantLock lock = this.lock; lock.lock(); try { //分代 final Generation g = generation; //當前generation已損壞,拋出BrokenBarrierException異常 if (g.broken) throw new BrokenBarrierException(); //若是線程中斷,終止CyclicBarrier if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //進來一個線程,count-1 int index = --count; //若是count==0表示全部線程均已到達屏障,能夠觸發barrierCommand任務 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //喚醒全部等待線程,並更新generation nextGeneration(); return 0; } finally { //若是barrierCommand執行失敗,終止CyclicBarrier if (!ranAction) breakBarrier(); } } for (;;) { try { //若是不是超時等待,則調用Condition.await()方法等待 if (!timed) trip.await(); else if (nanos > 0L) //若是是超時等待,則調用Condition.awaitNanos()等待 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); //generation已經更新,返回Index if (g != generation) return index; //超時等待而且時間已經到了,終止CyclicBarrier,並拋出超時異常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //釋放鎖 lock.unlock(); } 複製代碼
若是該線程不是到達的最後一個線程,則它會一直處於等待狀態,除非發生如下狀況:
一、最後一個到達:即index=0
二、超出了等待時間。
三、其餘的某個線程中斷當前線程。
四、其餘某個線程中斷另外一個等待的線程。
五、其餘某個線程在等待barrier超時。
六、其餘某個線程在此barrier調用reset方法,用於將該屏障置爲初始狀態。web
public class CyclicBarrierTest {
private static CyclicBarrier cyclicBarrier; private static final Integer THREAD_COUNT = 10; static class CyclicBarrierThread implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName()+"到教室了"); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String [] args) { cyclicBarrier = new CyclicBarrier(THREAD_COUNT, new Runnable() { @Override public void run() { System.out.println("同窗們都到齊了,開始上課吧..."); } }); for (int i=0; i< THREAD_COUNT; i++) { Thread thread = new Thread(new CyclicBarrierThread()); thread.start(); } } } 複製代碼
運行結果以下:面試
面試官:有一個和CyclicBarrier相似的工具類叫CountDownLatch,你能說下嗎?redis
我:CyclicBarrier描述的是「容許一組線程相互等待,直到到達某個公共屏障點,纔會進行後續任務」,而CountDownLatch所描述的是「在完成一組正在其餘線程中執行的操做以前,它容許 一個或多個線程一直等待」。在API中是這樣描述的:用給定的計數初始化CountDownLatch。因爲調用了countDown方法,因此在當前計數到達零以前,await方法會一直受阻塞。以後,會釋放 全部等待的線程,await的全部後續調用都將當即返回。這種現象只出現一次(計數沒法被重置。若是須要重置計數,請考慮使用CyclicBarrier)。
CountDownLatch是經過一個計數器來實現的,當咱們在new一個CountDownLatch對象的時候,須要傳入計數器的值,該值表示線程的數量。每當一個線程完成本身的任務後,計數器的值就會 減一。當計數器的值變爲0時,就表示全部線程均已完成任務,而後就能夠恢復等待的線程繼續執行了。
CountDownLatch和CyclicBarrier仍是有一點區別的:
一、CountDownLatch的做用是容許1或多個線程等待其餘線程完成執行;而CyclicBarrier則是容許多個線程互相等待。
二、CountDownLatch的計數器沒法被重置。CyclicBarrier的計數器能夠被重置後使用。spring
面試官:你能說下CountDownlatch是怎麼實現的嗎?sql
我:CountDownlatch內部依賴Sync實現,而Sync繼承AQS。以下圖:數據庫
CountDownlatch僅提供了一個構造方法,以下:編程
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } 複製代碼
再來看看Sync,是CountDownlatch的一個內部類。mybatis
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } //獲取同步狀態 int getCount() { return getState(); } //嘗試獲取同步狀態 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //嘗試釋放同步狀態 protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } 複製代碼
CountDownLatch內部經過共享鎖實現:
一、在建立CountDownLatch實例時,須要傳遞一個int型參數:count,該參數爲計數器的初始值,也能夠理解爲該共享鎖能夠獲取的總次數。
二、當某個線程調用await()方法,程序首先判斷count的值是否爲0,若是不爲0的話,則會一直等待直到爲0爲止。
三、當其餘線程調用countDown()方法時,則執行釋放共享鎖狀態,使count-1。
四、注意CountDownLatch不能回滾重置。
public class CountDownLatchTest {
private static final Integer STUDENT_COUNT = 10; private static CountDownLatch countDownLatch = new CountDownLatch(STUDENT_COUNT); static class TeacherThread implements Runnable { @Override public void run() { System.out.println("老師來了,等"+ STUDENT_COUNT+"位同窗都到教室了纔開始上課"); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(STUDENT_COUNT+"位同窗都到齊了,開始上課!"); } } static class StudentThread implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName()+"進了教室"); countDownLatch.countDown(); } } public static void main(String [] args) { Thread teacher = new Thread(new TeacherThread()); teacher.start(); for (int i=0; i<STUDENT_COUNT; i++) { Thread student = new Thread(new StudentThread()); student.start(); } } } 複製代碼
從上面的分析能夠看出:信號量Semaphore是一個非負整數(>=1)。當一個線程想要訪問某個共享資源時,它必須先獲取Semaphore。當Semaphore>0時,獲取該資源並使Semaphore-1。 若是Semaphore的值==0,則表示所有的共享資源已經被線程所有佔用,新來的線程必須等待其餘線程釋放資源。當線程釋放資源時,Semaphore則+1。
public class SemaphoreTest {
static class Parking { private Semaphore semaphore; Parking(int count) { semaphore = new Semaphore(count); } public void park() { try { //獲取信號量 semaphore.acquire(); long time = (long) (Math.random()*10+1); System.out.println(Thread.currentThread().getName()+"進入停車場停車,停車時間:"+time+"秒"); //模擬停車時間 Thread.sleep(time); System.out.println(Thread.currentThread().getName()+"開出停車場..."); } catch (InterruptedException e) { e.printStackTrace(); } finally { //釋放信號量(跟lock的用法差很少) semaphore.release(); } } } static class Car implements Runnable{ private Parking parking; Car(Parking parking) { this.parking = parking; } /** * 每輛車至關於一個線程,線程的任務就是停車 */ @Override public void run() { parking.park(); } } public static void main(String [] args) { //假設有3個停車位 Parking parking = new Parking(3); //這時候同時來了5輛車,只有3輛車能夠進去停車,其他2輛車須要等待有空餘車位以後才能進去停車。 for (int i=0; i<5; i++) { Thread thread = new Thread(new Car(parking)); thread.start(); } } } 複製代碼
運行結果:
public class ExchangerTest {
static class ThreadA implements Runnable { private Exchanger<String> exchanger; ThreadA (Exchanger<String> exchanger) { this.exchanger = exchanger; } @Override public void run() { try { //模擬業務代碼 Long time = (long)(Math.random()*10+1)*10; System.out.println("線程A等待了"+time+"秒"); Thread.sleep(time); //線程間數據交換 System.out.println("在線程A獲得線程B的值:"+ exchanger.exchange("我是線程A")); } catch (InterruptedException e) { e.printStackTrace(); } } } static class ThreadB implements Runnable { private Exchanger<String> exchanger; ThreadB(Exchanger<String> exchanger) { this.exchanger = exchanger; } @Override public void run() { try { //模擬業務代碼 Long time = (long)(Math.random()*10+1)*10; System.out.println("線程B等待了"+time+"秒"); Thread.sleep(time); //線程間數據交換 System.out.println("在線程B獲得線程A的值:"+ exchanger.exchange("我是線程B")); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String [] args) { Exchanger<String> exchanger = new Exchanger<>(); //線程A和線程B要使用同一個exchanger纔有用 Thread threadA = new Thread(new ThreadA(exchanger)); Thread threadB = new Thread(new ThreadB(exchanger)); threadA.start(); threadB.start(); } } 複製代碼
運行結果:
今天面試了嗎系列
Java併發編程系列:
https://juejin.im/post/5ef764985188252e7c21ad5c
https://juejin.im/post/5ee82c736fb9a047fe5c1af3
redis:
https://juejin.im/post/5dccf260f265da0bf66b626d
spring:
https://juejin.im/post/5e6d993cf265da575b1bd4af
mybatis:
https://juejin.im/post/5e80b6d76fb9a03c3f1e92a2#comment
數據庫系列
mysql索引:
https://juejin.im/post/5d67702cf265da03f333664c
數據庫鎖:
https://juejin.im/post/5dbbc1d06fb9a0205425309e
分庫分表:
https://juejin.im/post/5dc77a9451882559465e390b
數據庫事務:
https://juejin.im/post/5dcb9c38f265da4d2971038c
線上問題系列
https://juejin.im/post/5ef055b7e51d45740e4275e2
java基礎 https://juejin.im/post/5d5e2616f265da03b638b28a https://juejin.im/post/5d427f306fb9a06b122f1b94