《今天面試了嗎》-併發編程之AQS同步工具類

前言

上次面試中問到AQS簡直不要太痛苦,全是問的源碼。可是源碼有時間仍是要看看的,畢竟對於提高咱們的寫代碼的能力仍是有幫助的。今天的面試緊接上回的AQS,內容是基於AQS實現的四大併發工具類: CyclicBarrier,CountDownLatch,Semaphore和Exchanger,簡要分析實現原理,着重講述如何使用。java

面試環節

  • 面試官:上次聊到AQS,你在開發過程當中用過AQS的幾個工具類嗎?好比 CyclicBarrier...
  • 我:用過, CyclicBarrier是一個同步輔助類。它容許一組線程互相等待,直到到達某個公共屏障點。在涉及一組固定大小的線程的程序裏,這些線程必須不時的互相等待,此時CyclicBarrier 頗有用。由於CyclicBarrier在釋放等待線程後能夠重用,所以成爲循環的屏障。 下面來看下CyclicBarrier的定義:
private final ReentrantLock lock = new ReentrantLock();
  private final Condition trip = lock.newCondition();   //parties變量表示攔截線程的總數量,count變量表示攔截線程的剩餘須要數量  private final int parties;   //barrierCommand變量爲CyclicBarrier接收的Runnable命令,用於在線程到達屏障時,優先執行barrierCommand,用於處理更加複雜的業務場景。  private final Runnable barrierCommand;   //generation變量表示CyclicBarrier的更新換代  private Generation generation = new Generation(); 複製代碼

能夠看出CyclicBarrier內部是使用重入鎖和Condition的。它有兩個構造函數:mysql

/**
 建立一個新的CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,並在啓動barrier時執行給定的屏障操做,該操做由最後一個進入barrier的線程執行。  */  public CyclicBarrier(int parties, Runnable barrierAction) {  if (parties <= 0) throw new IllegalArgumentException();  this.parties = parties;  this.count = parties;  this.barrierCommand = barrierAction;  }  /**  建立一個新的CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,但它不會在啓動barrier時執行預約義的操做。  */  public CyclicBarrier(int parties) {  this(parties, null);  } 複製代碼
  • 面試官:那CyclicBarrier是怎麼讓線程到達屏障後處於等待狀態的呢?
  • 我:使用await()方法,每一個線程調用await()方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。當全部線程都到達了屏障,結束阻塞,全部線程可繼續執行後續邏輯。
public int await(long timeout, TimeUnit unit)
 throws InterruptedException,  BrokenBarrierException,  TimeoutException {  return dowait(true, unit.toNanos(timeout));  }    private int dowait(boolean timed, long nanos)  throws InterruptedException, BrokenBarrierException,  TimeoutException {  //獲取鎖  final ReentrantLock lock = this.lock;  lock.lock();  try {  //分代  final Generation g = generation;   //當前generation已損壞,拋出BrokenBarrierException異常  if (g.broken)  throw new BrokenBarrierException();  //若是線程中斷,終止CyclicBarrier  if (Thread.interrupted()) {  breakBarrier();  throw new InterruptedException();  }   //進來一個線程,count-1  int index = --count;  //若是count==0表示全部線程均已到達屏障,能夠觸發barrierCommand任務  if (index == 0) { // tripped  boolean ranAction = false;  try {  final Runnable command = barrierCommand;  if (command != null)  command.run();  ranAction = true;  //喚醒全部等待線程,並更新generation  nextGeneration();  return 0;  } finally {  //若是barrierCommand執行失敗,終止CyclicBarrier  if (!ranAction)  breakBarrier();  }  }    for (;;) {  try {  //若是不是超時等待,則調用Condition.await()方法等待  if (!timed)  trip.await();  else if (nanos > 0L)  //若是是超時等待,則調用Condition.awaitNanos()等待  nanos = trip.awaitNanos(nanos);  } catch (InterruptedException ie) {  if (g == generation && ! g.broken) {  breakBarrier();  throw ie;  } else {  Thread.currentThread().interrupt();  }  }   if (g.broken)  throw new BrokenBarrierException();   //generation已經更新,返回Index  if (g != generation)  return index;  //超時等待而且時間已經到了,終止CyclicBarrier,並拋出超時異常  if (timed && nanos <= 0L) {  breakBarrier();  throw new TimeoutException();  }  }  } finally {  //釋放鎖  lock.unlock();  } 複製代碼

若是該線程不是到達的最後一個線程,則它會一直處於等待狀態,除非發生如下狀況:
一、最後一個到達:即index=0
二、超出了等待時間。
三、其餘的某個線程中斷當前線程。
四、其餘某個線程中斷另外一個等待的線程。
五、其餘某個線程在等待barrier超時。
六、其餘某個線程在此barrier調用reset方法,用於將該屏障置爲初始狀態。web

  • 面試官:那CyclicBarrier什麼場景下用呢?
  • 我:CyclicBarrier適用於多線程合併的操做,用於多線程計算數據,最後合併計算結果的應用場景。舉個例子:
public class CyclicBarrierTest {
  private static CyclicBarrier cyclicBarrier;   private static final Integer THREAD_COUNT = 10;   static class CyclicBarrierThread implements Runnable {  @Override  public void run() {  System.out.println(Thread.currentThread().getName()+"到教室了");  try {  cyclicBarrier.await();  } catch (Exception e) {  e.printStackTrace();  }  }  }   public static void main(String [] args) {  cyclicBarrier = new CyclicBarrier(THREAD_COUNT, new Runnable() {  @Override  public void run() {  System.out.println("同窗們都到齊了,開始上課吧...");  }  });   for (int i=0; i< THREAD_COUNT; i++) {  Thread thread = new Thread(new CyclicBarrierThread());  thread.start();  }   } }  複製代碼

運行結果以下:面試

  • 面試官:有一個和CyclicBarrier相似的工具類叫CountDownLatch,你能說下嗎?redis

  • 我:CyclicBarrier描述的是「容許一組線程相互等待,直到到達某個公共屏障點,纔會進行後續任務」,而CountDownLatch所描述的是「在完成一組正在其餘線程中執行的操做以前,它容許 一個或多個線程一直等待」。在API中是這樣描述的:用給定的計數初始化CountDownLatch。因爲調用了countDown方法,因此在當前計數到達零以前,await方法會一直受阻塞。以後,會釋放 全部等待的線程,await的全部後續調用都將當即返回。這種現象只出現一次(計數沒法被重置。若是須要重置計數,請考慮使用CyclicBarrier)
    CountDownLatch是經過一個計數器來實現的,當咱們在new一個CountDownLatch對象的時候,須要傳入計數器的值,該值表示線程的數量。每當一個線程完成本身的任務後,計數器的值就會 減一。當計數器的值變爲0時,就表示全部線程均已完成任務,而後就能夠恢復等待的線程繼續執行了。
    CountDownLatch和CyclicBarrier仍是有一點區別的:
    一、CountDownLatch的做用是容許1或多個線程等待其餘線程完成執行;而CyclicBarrier則是容許多個線程互相等待。
    二、CountDownLatch的計數器沒法被重置。CyclicBarrier的計數器能夠被重置後使用。spring

  • 面試官:你能說下CountDownlatch是怎麼實現的嗎?sql

  • 我:CountDownlatch內部依賴Sync實現,而Sync繼承AQS。以下圖:數據庫

CountDownlatch僅提供了一個構造方法,以下:編程

public CountDownLatch(int count) {
 if (count < 0) throw new IllegalArgumentException("count < 0");  this.sync = new Sync(count);  } 複製代碼

再來看看Sync,是CountDownlatch的一個內部類。mybatis

private static final class Sync extends AbstractQueuedSynchronizer {
 private static final long serialVersionUID = 4982264981922014374L;   Sync(int count) {  setState(count);  }   //獲取同步狀態  int getCount() {  return getState();  }   //嘗試獲取同步狀態  protected int tryAcquireShared(int acquires) {  return (getState() == 0) ? 1 : -1;  }   //嘗試釋放同步狀態  protected boolean tryReleaseShared(int releases) {  for (;;) {  int c = getState();  if (c == 0)  return false;  int nextc = c-1;  if (compareAndSetState(c, nextc))  return nextc == 0;  }  }  } 複製代碼

CountDownLatch內部經過共享鎖實現:
一、在建立CountDownLatch實例時,須要傳遞一個int型參數:count,該參數爲計數器的初始值,也能夠理解爲該共享鎖能夠獲取的總次數。
二、當某個線程調用await()方法,程序首先判斷count的值是否爲0,若是不爲0的話,則會一直等待直到爲0爲止。
三、當其餘線程調用countDown()方法時,則執行釋放共享鎖狀態,使count-1。
四、注意CountDownLatch不能回滾重置。

  • 面試官:那你說下CountDownLatch是怎麼用的?
  • 我:
    一、CountDownlatch提供了await()方法,來使當前線程在鎖存器遞減倒數至0之前一直等待,除非線程被中斷,當前線程能夠是咱們的一個主線程。 二、CountDownlatch提供了countDown()方法,在子線程執行完後進行操做,遞減鎖存器的計數,若是計數到達0,則喚醒全部等待的線程(咱們的主線程)。 說完我拿起筆刷刷的寫起來:
public class CountDownLatchTest {
  private static final Integer STUDENT_COUNT = 10;   private static CountDownLatch countDownLatch = new CountDownLatch(STUDENT_COUNT);   static class TeacherThread implements Runnable {  @Override  public void run() {  System.out.println("老師來了,等"+ STUDENT_COUNT+"位同窗都到教室了纔開始上課");  try {  countDownLatch.await();  } catch (InterruptedException e) {  e.printStackTrace();  }  System.out.println(STUDENT_COUNT+"位同窗都到齊了,開始上課!");  }  }   static class StudentThread implements Runnable {  @Override  public void run() {  System.out.println(Thread.currentThread().getName()+"進了教室");  countDownLatch.countDown();  }  }   public static void main(String [] args) {  Thread teacher = new Thread(new TeacherThread());  teacher.start();  for (int i=0; i<STUDENT_COUNT; i++) {  Thread student = new Thread(new StudentThread());  student.start();  }  } } 複製代碼
  • 面試官:很好。懂得活學活用。你瞭解信號量Semaphore嗎?
  • 我: 信號量Semaphore是一個控制訪問多個共享資源的計數器,和CountDownLatch同樣,其本質上是一個「共享鎖」。在API是這麼介紹信號量的:一個計數信號量,從概念上講,信號量維護了一個許可集。
    一、若有必要,在許可可用前會阻塞每個acquire,而後再獲取該許可。
    二、每一個release添加一個許可,從而可能釋放一個正在阻塞的獲取者。可是不使用實際的許可對象,Semaphore只對可用許可的號碼進行計數,並採起相應的行動。
    下面以一個停車場的例子來闡述Semaphore:
    一、假設停車場有5個停車位,一開始車位都空着,而後前後來了三輛車,車位夠,安排進去停車,而後又來三輛,這個時候因爲只有兩個車位,因此只能停兩輛,有一輛須要在外面候着,直到 停車場有空位。
    二、從程序角度講,停車場就至關於信號量Semaphore,其中許可數爲5,車輛至關於線程,當來一輛車,許可數就會減1。當停車場沒車位了(許可數==0),其餘來的車輛必須等待。若是 有一輛車開車停車場,則許可數+1,而後放進來一輛車。

從上面的分析能夠看出:信號量Semaphore是一個非負整數(>=1)。當一個線程想要訪問某個共享資源時,它必須先獲取Semaphore。當Semaphore>0時,獲取該資源並使Semaphore-1。 若是Semaphore的值==0,則表示所有的共享資源已經被線程所有佔用,新來的線程必須等待其餘線程釋放資源。當線程釋放資源時,Semaphore則+1。

  • 面試官:你能用Semaphore實現這個停車的例子嗎?
  • 我(又刷刷的寫起來)
public class SemaphoreTest {
  static class Parking {   private Semaphore semaphore;   Parking(int count) {  semaphore = new Semaphore(count);  }   public void park() {  try {  //獲取信號量  semaphore.acquire();  long time = (long) (Math.random()*10+1);  System.out.println(Thread.currentThread().getName()+"進入停車場停車,停車時間:"+time+"秒");  //模擬停車時間  Thread.sleep(time);  System.out.println(Thread.currentThread().getName()+"開出停車場...");  } catch (InterruptedException e) {  e.printStackTrace();  } finally {  //釋放信號量(跟lock的用法差很少)  semaphore.release();  }  }  }   static class Car implements Runnable{   private Parking parking;   Car(Parking parking) {  this.parking = parking;  }   /**  * 每輛車至關於一個線程,線程的任務就是停車  */  @Override  public void run() {  parking.park();  }  }   public static void main(String [] args) {  //假設有3個停車位  Parking parking = new Parking(3);   //這時候同時來了5輛車,只有3輛車能夠進去停車,其他2輛車須要等待有空餘車位以後才能進去停車。  for (int i=0; i<5; i++) {  Thread thread = new Thread(new Car(parking));  thread.start();  }  } } 複製代碼

運行結果:

  • 面試官:很好,那我再問你一個,Exchanger交換器知道不?
  • 我: Exchanger是一個同步器,字面上就能夠看出這個類的主要做用是交換數據。Exchanger有點相似CyclicBarrier,前面說到CyclicBarrier是一個柵欄,到達柵欄的 線程須要等待必定數量的線程到達後,才能經過柵欄。Exchanger能夠當作是一個雙向的柵欄。線程1到達柵欄後,會首先觀察有沒有其餘線程已經到達柵欄,若是沒有就會等待。 若是已經有其餘線程(好比線程2)到達了,就會以成對的方式交換各自攜帶的信息,所以Exchanger很是適合兩個線程之間的數據交換。 以下圖:
  • 面試官:那你能跟我舉個例子說下Exchanger怎麼用嗎?
  • 我:固然能夠。
public class ExchangerTest {
  static class ThreadA implements Runnable {  private Exchanger<String> exchanger;   ThreadA (Exchanger<String> exchanger) {  this.exchanger = exchanger;  }   @Override  public void run() {  try {  //模擬業務代碼  Long time = (long)(Math.random()*10+1)*10;  System.out.println("線程A等待了"+time+"秒");  Thread.sleep(time);  //線程間數據交換  System.out.println("在線程A獲得線程B的值:"+ exchanger.exchange("我是線程A"));  } catch (InterruptedException e) {  e.printStackTrace();  }  }  }   static class ThreadB implements Runnable {  private Exchanger<String> exchanger;   ThreadB(Exchanger<String> exchanger) {  this.exchanger = exchanger;  }   @Override  public void run() {  try {  //模擬業務代碼  Long time = (long)(Math.random()*10+1)*10;  System.out.println("線程B等待了"+time+"秒");  Thread.sleep(time);  //線程間數據交換  System.out.println("在線程B獲得線程A的值:"+ exchanger.exchange("我是線程B"));  } catch (InterruptedException e) {  e.printStackTrace();  }  }  }   public static void main(String [] args) {  Exchanger<String> exchanger = new Exchanger<>();  //線程A和線程B要使用同一個exchanger纔有用  Thread threadA = new Thread(new ThreadA(exchanger));  Thread threadB = new Thread(new ThreadB(exchanger));  threadA.start();  threadB.start();  } } 複製代碼

運行結果:

往期精彩回顧

今天面試了嗎系列
Java併發編程系列:
https://juejin.im/post/5ef764985188252e7c21ad5c
https://juejin.im/post/5ee82c736fb9a047fe5c1af3
redis:
https://juejin.im/post/5dccf260f265da0bf66b626d
spring:
https://juejin.im/post/5e6d993cf265da575b1bd4af
mybatis:
https://juejin.im/post/5e80b6d76fb9a03c3f1e92a2#comment

數據庫系列
mysql索引:
https://juejin.im/post/5d67702cf265da03f333664c
數據庫鎖:
https://juejin.im/post/5dbbc1d06fb9a0205425309e
分庫分表:
https://juejin.im/post/5dc77a9451882559465e390b
數據庫事務:
https://juejin.im/post/5dcb9c38f265da4d2971038c

線上問題系列
https://juejin.im/post/5ef055b7e51d45740e4275e2

java基礎 https://juejin.im/post/5d5e2616f265da03b638b28a https://juejin.im/post/5d427f306fb9a06b122f1b94

相關文章
相關標籤/搜索