面試官:你用過JUC的哪些工具類?
前面從基礎開始,到線程安全的實現、對象的發佈與共享,涉及到不少線程安全的類與工具,JDK1.5開始,提供了更加方便強大的線程同步管理工具包JUC讓咱們使用,這個也是面試與實踐中的重點,本文結合源代碼做一些比較落地的講解。html
imagejava
報告面試官,JUC中有很是多的類,將部分類按功能進行分類,分別是:node
AbstractQueuedSynchronizer,即隊列同步器。它是構建鎖或者其餘同步組件的基礎框架,它是JUC併發包中的核心基礎組件。git
image.png程序員
同步隊列:AQS經過內置的FIFO同步隊列來完成資源獲取線程的排隊工做,若是當前線程獲取同步狀態失敗(鎖)時,AQS則會將當前線程以及等待狀態等信息構形成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。
繼承實現:AQS的主要使用方式是繼承,子類經過繼承同步器並實現它的抽象方法acquire/release來管理同步狀態。
同步狀態維護:AQS使用一個int類型的成員變量state來表示同步狀態,當state > 0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操做,固然AQS能夠確保對state的操做是安全的。github
計數器閉鎖是一個能阻塞主線程,讓其餘線程知足特定條件下主線程再繼續執行的線程同步工具。面試
public class CountDownLatchTest { private static final int COUNT = 1000; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(COUNT); for (int i = 0; i < COUNT; i++) { //countDown方法的執行次數必定要與countDownLatch的計數器數量一致,不然沒法將計數器清空致使主線程沒法繼續執行 int finalI = i; executorService.execute(() -> { try { Thread.sleep(3000); System.out.println(finalI); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(1, TimeUnit.SECONDS); //主線程只等1秒,超過以後繼續執行主線程 executorService.shutdown(); //當正在執行的線程執行完成以後再關閉而不是當即中止線程 System.out.println("done!"); } }
這段程序先設置CountDownLatch爲100,而後在其餘線程中調用100次countDown方法,隨後主程序在等待100次被執行完成以後,繼續執行主線程代碼算法
image.png數據庫
圖中,A爲主線程,A首先設置計數器的數到AQS的state中,當調用await方法以後,A線程阻塞,隨後每次其餘線程調用countDown的時候,將state減1,直到計數器爲0的時候,A線程繼續執行。編程
1.並行計算:把任務分配給不一樣線程以後須要等待全部線程計算完成以後主線程才能彙總獲得最終結果
2.模擬併發:能夠做爲併發次數的統計變量,當任意多個線程執行完成併發任務以後統計一次便可
信號量是一個能阻塞線程且能控制統一時間請求的併發量的工具。好比能保證同時執行的線程最多200個,模擬出穩定的併發量。
public class CountDownLatchTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Semaphore semaphore = new Semaphore(3); //配置只能發佈3個運行許可證 for (int i = 0; i < 100; i++) { int finalI = i; executorService.execute(() -> { try { semaphore.acquire(3); //獲取3個運行許可,若是獲取不到會一直等待,使用tryAcquire則不會等待 Thread.sleep(1000); System.out.println(finalI); semaphore.release(3); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); } }
因爲同時獲取3個許可,因此有即便開啓了100個線程,可是每秒只能執行一個任務
new Semaphore(3)傳入的3就是AQS中state的值,也是許可數的總數,在調用acquire時,檢測此時許可數若是小於0,就將被阻塞,而後將線程構建Node進入AQS隊列
//AQS的骨架,其中tryAcquireShared將調用到Semaphore中的nonfairTryAcquireShared //通常經常使用非公平的信號量,非公平信號量是指在獲取許可時直接循環獲取,若是獲取失敗,纔會入列 //公平的信號量在獲取許可時首先要查看等待隊列中是否已有線程,若是有則將線程入列等待 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } // 若是remaining小於0,許可獲取失敗,執行shouldParkAfterFailedAcquire方法入列而後等待 // 若是remaining大於0,許可獲取成功,且更新state成功,那麼則setHeadAndPropagate而且當即返回 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
數據庫鏈接併發數,若是超過併發數,等待(acqiure)或者拋出異常(tryAcquire)
可讓一組線程相互等待,當每一個線程都準備好以後,全部線程才繼續執行的工具類
public class CyclicBarrierTest { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { System.out.println("ready done callback!"); }); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 100; i++) { int finalI = i; Thread.sleep(1000); executorService.execute(() -> { try { System.out.println(finalI + "ready!"); cyclicBarrier.await(); // cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); // 若是某個線程等待超過2秒就報錯 System.out.println(finalI + "go!"); } catch (Exception e) { e.printStackTrace(); } }); } } }
image.png
與CountDownLatch相似,都是經過計數器實現的,當某個線程調用await以後,計數器減1,當計數器大於0時將等待的線程包裝成AQS的Node放入等待隊列中,當計數器爲0時將等待隊列中的Node拿出來執行。
與CountDownLatch的區別:
1.CDL是一個線程等其餘線程,CB是多個線程相互等待
2.CB的計數器能重複使用,調用屢次
1.CyclicBarrier能夠用於多線程計算數據,最後合併計算結果的應用場景。好比咱們用一個Excel保存了用戶全部銀行流水,每一個Sheet保存一個賬戶近一年的每筆銀行流水,如今須要統計用戶的日均銀行流水,先用多線程處理每一個sheet裏的銀行流水,都執行完以後,獲得每一個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。
2.有四個遊戲玩家玩遊戲,遊戲有三個關卡,每一個關卡必需要全部玩家都到達後才能容許經過。其實這個場景裏的玩家中若是有玩家A先到了關卡1,他必須等到其餘全部玩家都到達關卡1時才能經過,也就是說線程之間須要相互等待。
名爲可重入鎖,其實synchronized也可重入,是JDK層級上的一個併發控制工具
public class ConcurrencyTest { private static final int THREAD_COUNT = 5000; private static final int CONCURRENT_COUNT = 200; private static int count = 0; private static Lock lock = new ReentrantLock(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); Semaphore semaphore = new Semaphore(CONCURRENT_COUNT); CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT); for (int i = 0; i < THREAD_COUNT; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println(count); } private static void add() { lock.lock(); try { count++; } finally { lock.unlock(); } } }
參考:https://www.jianshu.com/p/fe027772e156
// 以公平鎖爲例,從lock.lock()開始研究 final void lock() { acquire(1);} public final void acquire(int arg) { if (!tryAcquire(arg) && // 首先經過公平或者非公平方式嘗試獲取鎖 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 而後構建一個Node放入隊列中並等待執行的時機 selfInterrupt(); } // 公平鎖設置鎖執行狀態的邏輯 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //若是state是0,就是當前的鎖沒有人佔有 if (!hasQueuedPredecessors() && // 公平鎖的核心邏輯,判斷隊列是否有排在前面的線程在等待鎖,非公平鎖就沒這個條件判斷 compareAndSetState(0, acquires)) { // 若是隊列沒有前面的線程,使用CAS的方式修改state setExclusiveOwnerThread(current); // 將線程記錄爲獨佔鎖的線程 return true; } } else if (current == getExclusiveOwnerThread()) { // 由於ReentrantLock是可重入的,線程能夠不停地lock來增長state的值,對應地須要unlock來解鎖,直到state爲零 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 接下來要執行的acquireQueued以下 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 再次使用公平鎖邏輯判斷是否將Node做爲頭結點當即執行 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
1.用法:synchronized既能夠很方便的加在方法上,也能夠加載特定代碼塊上,而lock須要顯示地指定起始位置和終止位置。
2.實現:synchronized是依賴於JVM實現的,而ReentrantLock是JDK實現的
3.性能:synchronized和lock其實已經相差無幾,其底層實現已經差很少了。可是若是你是Android開發者,使用synchronized仍是須要考慮其性能差距的。
4.功能:ReentrantLock功能更強大。
4.1 ReentrantLock能夠指定是公平鎖仍是非公平鎖,而synchronized只能是非公平鎖,所謂的公平鎖就是先等待的線程先得到鎖
4.2 ReentrantLock提供了一個Condition(條件)類,用來實現分組喚醒須要喚醒的線程們,而不是像synchronized要麼隨機喚醒一個線程要麼喚醒所有線程
4.3 ReentrantLock提供了一種可以中斷等待鎖的線程的機制,經過lock.lockInterruptibly()來實現這個機制
咱們控制線程同步的時候,優先考慮synchronized,若是有特殊須要,再進一步優化。ReentrantLock若是用的很差,不只不能提升性能,還可能帶來災難。
條件對象的意義在於對於一個已經獲取鎖的線程,若是還須要等待其餘條件才能繼續執行的狀況下,纔會使用Condition條件對象。
與ReentrantLock結合使用,相似wait與notify。
public class ConditionTest { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); Thread thread1 = new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " run"); System.out.println(Thread.currentThread().getName() + " wait for condition"); try { condition.await(); // 1.將線程1放入到Condition隊列中等待被喚醒,且當即釋放鎖 System.out.println(Thread.currentThread().getName() + " continue"); // 3.線程2執行完畢釋放鎖,此時線程1已經在AQS等待隊列中,則當即執行 } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " interrupted"); Thread.currentThread().interrupt(); } } finally { lock.unlock(); } }); Thread thread2 = new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " run"); System.out.println(Thread.currentThread().getName() + " sleep 1 secs"); try { Thread.sleep(1000); } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " interrupted"); Thread.currentThread().interrupt(); } condition.signalAll(); // 2.線程2得到鎖,signalAll將Condition中的等待隊列所有取出並加入到AQS中 } finally { lock.unlock(); } }); thread1.start(); thread2.start(); } }
輸出結果爲
Thread-0 run Thread-0 wait for condition Thread-1 run Thread-1 sleep 1 secs Thread-0 continue
可參看第一篇中PDF資料中《線程間通訊》一節
不是AQS的子類,可是能拿到線程執行的結果很是有用。
public interface Runnable { public abstract void run(); }
因爲run()方法返回值爲void類型,因此在執行完任務以後沒法返回任何結果
要使用的話直接實現就能夠了
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
泛型接口,call()函數返回的類型就是傳遞進來的V類型,同時能結合lambda使用
要使用的話要結合ExecutorService的以下方法使用
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task);
FutureTask<V> implements RunnableFuture<V> RunnableFuture<V> extends Runnable, Future<V>
Future是Java 5添加的類,用來描述一個異步計算的結果。你可使用isDone方法檢查計算是否完成,或者使用get阻塞住調用線程,直到計算完成返回結果,你也可使用cancel方法中止任務的執行。
public class FutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); Future<String> future = executorService.submit(() -> { try { System.out.println("doing"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "done"; }); System.out.println(future.get()); } }
接口畢竟是接口,只能被賦值,不能直接new出來,因此能夠new FutureTask直接來建立Future任務
public class FutureTaskTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); FutureTask<String> futureTask = new FutureTask<>(() -> { System.out.println("doing"); Thread.sleep(1000); return "down"; }); executorService.submit(futureTask); // new Thread(futureTask).start(); System.out.println(futureTask.get()); executorService.shutdown(); } }
但其實在項目中使用到最多的Future類是1.8提供的這個類,由於雖然Future以及相關使用方法提供了異步執行任務的能力,可是對於結果的獲取倒是很不方便,只能經過阻塞方式獲得任務的結果,阻塞的方式顯然和咱們的異步編程的初衷相違背。Java CompletableFuture 詳解
其實簡單來講,原理就是經過本身維護一套線程同步與等待的機制與線程池去實現這樣的異步任務處理機制,下面的例子是開發中最常常用到的,等待全部任務完成,繼續處理數據的例子。還有異步任務依賴的例子請參看上文鏈接。
public class CompletableFutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> string1Future = CompletableFuture.supplyAsync(() -> { System.out.println("doing string1"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("done string1"); return "string1"; }); CompletableFuture<String> string2Future = CompletableFuture.supplyAsync(() -> { System.out.println("doing string2"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("done string2"); return "string2"; }); CompletableFuture.allOf(string1Future, string2Future).join(); System.out.println(string1Future.get() + "and" + string2Future.get()); } }
假設咱們有若干生產者線程,另外又有若干個消費者線程。若是生產者線程須要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就能夠很方便地解決他們之間的數據共享問題。
但若是生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的狀況呢?理想狀況下,若是生產者產出數據的速度大於消費者消費的速度,而且當生產出來的數據累積到必定程度的時候,那麼生產者暫停等待一下(阻塞生產者線程)或者繼續將產品放入隊列中。
然而,在concurrent包發佈之前,在多線程環境下,咱們每一個程序員都必須去本身控制這些細節,尤爲還要兼顧效率和線程安全,而這會給咱們的程序帶來不小的複雜度
在後文的線程池相關內容中會提到,線程池也使用到了這個工具完成不一樣需求。
使用方式、子類的詳細介紹參看這裏
fork join框架是JDK7中出現的一款高效的工具,Java開發人員能夠經過它充分利用現代服務器上的多處理器。它是專門爲了那些能夠遞歸劃分紅許多子模塊設計的,目的是將全部可用的處理能力用來提高程序的性能。fork join框架一個巨大的優點是它使用了工做竊取算法,能夠完成更多任務的工做線程能夠從其它線程中竊取任務來執行
但這樣會要額外地對任務分派線程進行管理,無形地會增長管理的難度和複雜度,還可能碰到資源競爭致使的同步操做與性能損耗
http://coding.imooc.com/class/195.html
以及其餘超鏈接引用
最近在總結一些針對Java面試相關的知識點,感興趣的朋友能夠一塊兒維護~
地址:https://github.com/xbox1994/2018-Java-Interview
轉載自:Java多線程與高併發(四):java.util.concurrent包
做者:Gallrax 連接:https://www.jianshu.com/p/46728d6bc6b2 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。