『併發包入坑指北』之向大佬彙報任務

前言

在面試過程當中聊到併發相關的內容時,很多面試官都喜歡問這類問題:java

當 N 個線程同時完成某項任務時,如何知道他們都已經執行完畢了。git

這也是本次討論的話題之一,因此本篇爲『併發包入坑指北』的第二篇;來聊聊常見的併發工具。github

本身實現

其實這類問題的核心論點都是:如何在一個線程中得知其餘線程是否執行完畢。面試

假設如今有 3 個線程在運行,須要在主線程中得知他們的運行結果;能夠分爲如下幾步:安全

  • 定義一個計數器爲 3。
  • 每一個線程完成任務後計數減一。
  • 一旦計數器減爲 0 則通知等待的線程。

因此也很容易想到能夠利用等待通知機制來實現,和上文的『併發包入坑指北』之阻塞隊列的相似。多線程

按照這個思路自定義了一個 MultipleThreadCountDownKit 工具,構造函數以下:併發

考慮到併發的前提,這個計數器天然須要保證線程安全,因此採用了 AtomicInteger框架

因此在初始化時須要根據線程數量來構建對象。ide

計數器減一

當其中一個業務線程完成後須要將這個計數器減一,直到減爲0爲止。函數

/**
     * 線程完成後計數 -1
     */
    public void countDown(){

        if (counter.get() <= 0){
            return;
        }

        int count = this.counter.decrementAndGet();
        if (count < 0){
            throw new RuntimeException("concurrent error") ;
        }

        if (count == 0){
            synchronized (notify){
                notify.notify();
            }
        }

    }

利用 counter.decrementAndGet() 來保證多線程的原子性,當減爲 0 時則利用等待通知機制來 notify 其餘線程。

等待全部線程完成

而須要知道業務線程執行完畢的其餘線程則須要在未完成以前一直處於等待狀態,直到上文提到的在計數器變爲 0 時獲得通知。

/**
     * 等待全部的線程完成
     * @throws InterruptedException
     */
    public void await() throws InterruptedException {
        synchronized (notify){
            while (counter.get() > 0){
                notify.wait();
            }

            if (notifyListen != null){
                notifyListen.notifyListen();
            }

        }
    }

原理也很簡單,一旦計數器還存在時則會利用 notify 對象進行等待,直到被業務線程喚醒。

同時這裏新增了一個通知接口能夠自定義實現喚醒後的一些業務邏輯,後文會作演示。

併發測試

主要就是這兩個函數,下面來作一個演示。

  • 初始化了三個計數器的併發工具 MultipleThreadCountDownKit
  • 建立了三個線程分別執行業務邏輯,完畢後執行 countDown()
  • 線程 3 休眠了 2s 用於模擬業務耗時。
  • 主線程執行 await() 等待他們三個線程執行完畢。

經過執行結果能夠看出主線程會等待最後一個線程完成後纔會退出;從而達到了主線程等待其他線程的效果。

MultipleThreadCountDownKit multipleThreadKit = new MultipleThreadCountDownKit(3);
    multipleThreadKit.setNotify(() -> LOGGER.info("三個線程完成了任務"));

也能夠在初始化的時候指定一個回調接口,用於接收業務線程執行完畢後的通知。

固然和在主線程中執行這段邏輯效果是同樣的(和執行 await() 方法處於同一個線程)。

CountDownLatch

固然咱們本身實現的代碼沒有通過大量生產環境的驗證,因此主要的目的仍是嘗試窺探官方的實現原理。

因此咱們如今來看看 juc 下的 CountDownLatch 是如何實現的。

經過構造函數會發現有一個 內部類 Sync,他是繼承於 AbstractQueuedSynchronizer ;這是 Java 併發包中的基礎框架,均可以單獨拿來說了,因此此次重點不是它,從此咱們再着重介紹。

這裏就能夠把他簡單理解爲提供了和上文相似的一個計數器及線程通知工具就好了。

countDown

其實他的核心邏輯和咱們本身實現的區別不大。

public void countDown() {
        sync.releaseShared(1);
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

利用這個內部類的 releaseShared 方法,咱們能夠理解爲他想要將計數器減一。

看到這裏有沒有似曾相識的感受。

沒錯,在 JDK1.7 中的 AtomicInteger 自減就是這樣實現的(利用 CAS 保證了線程安全)。

只是一旦計數器減爲 0 時則會執行 doReleaseShared 喚醒其餘的線程。


這裏咱們只須要關心紅框部分(其餘的暫時不用關心,這裏涉及到了 AQS 中的隊列相關),最終會調用 LockSupport.unpark 來喚醒線程;就至關於上文調用 object.notify()

因此其實本質上仍是相同的。

await

其中的 await() 也是借用 Sync 對象的方法實現的。

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //判斷計數器是否還未完成    
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

一旦還存在未完成的線程時,則會調用 doAcquireSharedInterruptibly 進入阻塞狀態。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

一樣的因爲這也是 AQS 中的方法,咱們只須要關心紅框部分;其實最終就是調用了 LockSupport.park 方法,也就至關於執行了 object.wait()

  • 全部的業務線程執行完畢後會在計數器減爲 0 時調用 LockSupport.unpark 來喚醒線程。
  • 等待線程一旦計數器 > 0 時則會利用 LockSupport.park 來等待喚醒。

這樣整個流程也就串起來了,它的使用方法也和上文的相似。

就不作過多介紹了。

實際案例

一樣的來看一個實際案例。

在上一篇《一次分表踩坑實踐的探討》提到了對於全表掃描的狀況下,須要利用多線程來提升查詢效率。

好比咱們這裏分爲了 64 張表,計劃利用 8 個線程來分別處理這些表的數據,僞代碼以下:

CountDownLatch count = new CountDownLatch(64);
ConcurrentHashMap total = new ConcurrentHashMap();
for(Integer i=0;i<=63;i++){
    executor.execute(new Runnable(){
        @Override
        public void run(){
            List value = queryTable(i);
            total.put(value,NULL);
            count.countDown();
        }
    }) ;
    
}

count.await();
System.out.println("查詢完畢");

這樣就能夠實現全部數據都查詢完畢後再作統一彙總;代碼挺簡單,也好理解(固然也可使用線程池的 API)。

總結

CountDownLatch 算是 juc 中一個高頻使用的工具,學會和理解他的使用會幫助咱們更容易編寫併發應用。

文中涉及到的源碼:

https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/communication/MultipleThreadCountDownKit.java

你的點贊與分享是對我最大的支持

相關文章
相關標籤/搜索