Java併發學習之CountDownLatch實現原理及使用姿式

CountDownLatch實現原理及使用姿式

在併發編程的場景中,最多見的一個case是某個任務的執行,須要等到多個線程都執行完畢以後才能夠進行,CountDownLatch能夠很好解決這個問題前端

下面將主要從使用和實現原理兩個方面進行說明,圍繞點以下java

  1. CountDownLatch 是個什麼鬼
  2. 怎麼用(結合case說明)
  3. 底層實現原理(及如何保障功能的正常性)

I. 使用說明

同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待node

比較有意思的是,CountDownLatch並未繼承自其餘的類or接口,在jdk中這樣的類並很少見(多半是我孤陋寡聞)編程

0. 接口定義

在使用以前,得先了解下其定義的幾個方法多線程

// 構造器,必須指定一個大於零的計數
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

// 線程阻塞,直到計數爲0的時候喚醒;能夠響應線程中斷退出阻塞
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// 線程阻塞一段時間,若是計數依然不是0,則返回false;不然返回true
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// 計數-1
public void countDown() {
    sync.releaseShared(1);
}

// 獲取計數
public long getCount() {
    return sync.getCount();
}

也就幾個接口,基本上都是比較常見的了,須要注意的是不要把 await()Object#wait()方法弄混了,不然就gg思密達了...併發

1. Demo演示

依然以講解 ReentrantLock中的例子來講明,多線程實現累加框架

線程1實現 10加到100
線程2實現 100加到200
線程3實現 線程1和線程2計算結果的和

實現以下oop

public class CountDownLatchDemo {
    private CountDownLatch countDownLatch;

    private int start = 10;
    private int mid = 100;
    private int end = 200;

    private volatile int tmpRes1, tmpRes2;

    private int add(int start, int end) {
        int sum = 0;
        for (int i = start; i <= end; i++) {
            sum += i;
        }
        return sum;
    }


    private int sum(int a, int b) {
        return a + b;
    }

    public void calculate() {
        countDownLatch = new CountDownLatch(2);

        Thread thread1 = new Thread(() -> {
            try {
                // 確保線程3先與1,2執行,因爲countDownLatch計數不爲0而阻塞
                Thread.sleep(100);
                System.out.println(Thread.currentThread().getName() + " : 開始執行");
                tmpRes1 = add(start, mid);
                System.out.println(Thread.currentThread().getName() +
                        " : calculate ans: " + tmpRes1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        }, "線程1");

        Thread thread2 = new Thread(() -> {
            try {
                // 確保線程3先與1,2執行,因爲countDownLatch計數不爲0而阻塞
                Thread.sleep(100);
                System.out.println(Thread.currentThread().getName() + " : 開始執行");
                tmpRes2 = add(mid + 1, end);
                System.out.println(Thread.currentThread().getName() +
                        " : calculate ans: " + tmpRes2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        }, "線程2");


        Thread thread3 = new Thread(()-> {
            try {
                System.out.println(Thread.currentThread().getName() + " : 開始執行");
                countDownLatch.await();
                int ans = sum(tmpRes1, tmpRes2);
                System.out.println(Thread.currentThread().getName() +
                        " : calculate ans: " + ans);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "線程3");

        thread3.start();
        thread1.start();
        thread2.start();
    }


    public static void main(String[] args) throws InterruptedException {
        CountDownLatchDemo demo = new CountDownLatchDemo();
        demo.calculate();

        Thread.sleep(1000);
    }
}

輸出學習

線程3 : 開始執行
線程1 : 開始執行
線程2 : 開始執行
線程1 : calculate ans: 5005
線程2 : calculate ans: 15050
線程3 : calculate ans: 20055

看了上面的定義和Demo以後,使用就會簡單一點了,通常流程如ui

  1. 首先是建立實例 CountDownLatch countDown = new CountDownLatch(2)
  2. 須要同步的線程執行完以後,計數-1; countDown.countDown()
  3. 須要等待其餘線程執行完畢以後,再運行的線程,調用 countDown.await()實現阻塞同步

注意

  • 在建立實例是,必須指定初始的計數值,且應大於0
  • 必須有線程中顯示的調用了countDown()計數-1方法;必須有線程顯示調用了 await()方法(沒有這個就沒有必要使用CountDownLatch了)
  • 因爲await()方法會阻塞到計數爲0,若是在代碼邏輯中某個線程漏掉了計數-1,致使最終計數一直大於0,直接致使死鎖了
  • 鑑於上面一點,更多的推薦 await(long, TimeUnit)來替代直接使用await()方法,至少不會形成阻塞死只能重啓的狀況

有興趣的小夥伴能夠對比下這個實現與 《Java併發學習之ReentrantLock的工做原理及使用姿式》中的demo,明顯感受使用CountDownLatch優雅得多(後面有機會介紹用更有意思的Fork/Join來實現累加)

2. 應用場景

前面給了一個demo演示如何用,那這個東西在實際的業務場景中是否會用到呢?

由於確實在一個業務場景中使用到了,否則也就不會單獨撈出這一節...

電商的詳情頁,由衆多的數據拼裝組成,如能夠分紅一下幾個模塊

  • 交易的收發貨地址,銷量
  • 商品的基本信息(標題,圖文詳情之類的)
  • 推薦的商品列表
  • 評價的內容
  • ....

上面的幾個模塊信息,都是從不一樣的服務獲取信息,且彼此沒啥關聯;因此爲了提升響應,徹底能夠作成併發獲取數據,如

  • 線程1獲取交易相關數據
  • 線程2獲取商品基本信息
  • 線程3獲取推薦的信息
  • 線程4獲取評價信息
  • ....

可是最終拼裝數據並返回給前端,須要等到上面的全部信息都獲取完畢以後,才能返回,這個場景就很是的適合 CountDownLatch來作了

  1. 在拼裝完整數據的線程中調用 CountDownLatch#await(long, TimeUnit) 等待全部的模塊信息返回
  2. 每一個模塊信息的獲取,由一個獨立的線程執行;執行完畢以後調用 CountDownLatch#countDown() 進行計數-1

II. 實現原理

同ReentrantLock同樣,依然是藉助AQS的雙端隊列,來實現原子的計數-1,線程阻塞和喚醒

前面《Java併發學習之ReentrantLock的工做原理及使用姿式》 介紹了AQS的結構,方便查看,下面直接貼出

0. AbstractQueuedSynchronizer (簡稱AQS)

AQS是一個用於構建鎖和同步容器的框架。事實上concurrent包內許多類都是基於AQS構建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解決了在實現同步容器時設計的大量細節問題

AQS使用一個FIFO的隊列表示排隊等待鎖的線程,隊列頭節點稱做「哨兵節點」或者「啞節點」,它不與任何線程關聯。其餘的節點與等待線程關聯,每一個節點維護一個等待狀態waitStatus

private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    //取值爲 CANCELLED, SIGNAL, CONDITION, PROPAGATE 之一
    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    // Link to next node waiting on condition, 
    // or the special value SHARED
    volatile Thread thread;

    Node nextWaiter;
}

1. 計數器的初始化

CountDownLatch內部實現了AQS,並覆蓋了tryAcquireShared()tryReleaseShared()兩個方法,下面說明幹嗎用的

經過前面的使用,清楚了計數器的構造必須指定計數值,這個直接初始化了 AQS內部的state變量

Sync(int count) {
    setState(count);
}

後續的計數-1/判斷是否可用都是基於sate進行的

2. countDown() 計數-1的實現

// 計數-1
public void countDown() {
    sync.releaseShared(1);
}


public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // 首先嚐試釋放鎖
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0) //若是計數已經爲0,則返回失敗
            return false;
        int nextc = c-1;
        // 原子操做實現計數-1
        if (compareAndSetState(c, nextc)) 
            return nextc == 0;
    }
}

// 喚醒被阻塞的線程
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) { // 隊列非空,表示有線程被阻塞
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { 
            // 頭結點若是爲SIGNAL,則喚醒頭結點下個節點上關聯的線程,並出隊
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head) // 沒有線程被阻塞,直接跳出
            break;
    }
}

上面截出計數減1的完整調用鏈

  1. 嘗試釋放鎖tryReleaseShared,實現計數-1
  • 若計數已經小於0,則直接返回false
  • 不然執行計數(AQS的state)減一
  • 若減完以後,state==0,表示沒有線程佔用鎖,即釋放成功,而後就須要喚醒被阻塞的線程了
  1. 釋放並喚醒阻塞線程 doReleaseShared
  • 若是隊列爲空,即表示沒有線程被阻塞(也就是說沒有線程調用了 CountDownLatch#wait()方法),直接退出
  • 頭結點若是爲SIGNAL, 則依次喚醒頭結點下個節點上關聯的線程,並出隊

疑問一: 看到這個實現,是否是隻要countDownLatch的計數爲0了,全部被阻塞的線程都會被執行?

改下上面的demo,新增線程4,實現線程2的結果-線程1的結果

public class CountDownLatchDemo {
    
    // ...省略重複
    
    private int sub(int a, int b) {
        return a - b;
    }

    public void calculate() {
        countDownLatch = new CountDownLatch(2);

        Thread thread1 = // ... ;
        Thread thread2 = // ...;
        
        Thread thread3 = new Thread(()-> {
            try {
                System.out.println(Thread.currentThread().getName() + " : 開始執行");
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + " : 喚醒");
                Thread.sleep(100); // 確保線程4先執行完相減
                int ans = sum(tmpRes1, tmpRes2);
                System.out.println(Thread.currentThread().getName() +
                        " : calculate ans: " + ans);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "線程3");

        Thread thread4 = new Thread(()-> {
            try {
                System.out.println(Thread.currentThread().getName() + " : 開始執行");
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + " : 喚醒");
                int ans = sub(tmpRes2, tmpRes1);
                Thread.sleep(200); // 保證線程3先輸出執行結果,以驗證線程3和線程4是否併發執行
                System.out.println(Thread.currentThread().getName() +
                        " : calculate ans: " + ans);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "線程4");
        
        thread3.start();
        thread4.start();
        thread1.start();
        thread2.start();
    }

    public static void main(String[] args) throws InterruptedException {
        CountDownLatchDemo demo = new CountDownLatchDemo();
        demo.calculate();

        Thread.sleep(1000);
    }
}

輸出以下

線程4 : 開始執行
線程3 : 開始執行
線程2 : 開始執行
線程2 : calculate ans: 15050
線程1 : 開始執行
線程1 : calculate ans: 5005
線程3 : 喚醒
線程4 : 喚醒
線程3 : calculate ans: 20055
線程4 : calculate ans: 10045

上面的實現中,線程3中sleep一段時間,確保線程4的計算會優先執行;線程4計算完成以後的sleep時間,以保證線程3計算完成並輸出結果,而後線程4才輸出結果;結合輸出,這個指望是準確的,也就是說,線程3和線程4被喚醒後是併發執行的,沒有前後阻塞順序

即CountDownLatch計數爲0以後,全部被阻塞的線程都會被喚醒,且彼此相對獨立,不會出現獨佔鎖阻塞的問題

3. await() 阻塞等待計數爲0

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
    

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted()) // 若線程中端,直接拋異常
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}


// 計數爲0時,表示獲取鎖成功
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

// 阻塞,併入隊
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED); // 入隊
    boolean failed = true;
    try {
        for (;;) {
            // 獲取前驅節點
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 獲取鎖成功,設置隊列頭爲node節點
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) // 線程掛起
              && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

阻塞的邏輯相對簡單

  1. 判斷state計數是否爲0,不是,則直接放過執行後面的代碼
  2. 大於0,則表示須要阻塞等待計數爲0
  3. 當前線程封裝Node對象,進入阻塞隊列
  4. 而後就是循環嘗試獲取鎖,直到成功(即state爲0)後出隊,繼續執行線程後續代碼

III. 小結

1. 使用注意

  • 在建立實例時,必須指定初始的計數值,且應大於0
  • 必須有線程中顯示的調用了countDown()計數-1方法;必須有線程顯示調用了await()方法(沒有這個就沒有必要使用CountDownLatch了)
  • 因爲await()方法會阻塞到計數爲0,若是在代碼邏輯中某個線程漏掉了計數-1,致使最終計數一直大於0,直接致使死鎖了;
  • 鑑於上面一點,更多的推薦 await(long, TimeUnit)來替代直接使用await()方法,至少不會形成阻塞死只能重啓的狀況
  • 容許多個線程調用await方法,當計數爲0後,全部被阻塞的線程都會被喚醒

2. 實現原理

await內部實現流程:

  1. 判斷state計數是否爲0,不是,則直接放過執行後面的代碼
  2. 大於0,則表示須要阻塞等待計數爲0
  3. 當前線程封裝Node對象,進入阻塞隊列
  4. 而後就是循環嘗試獲取鎖,直到成功(即state爲0)後出隊,繼續執行線程後續代碼

countDown內部實現流程:

  1. 嘗試釋放鎖tryReleaseShared,實現計數-1
  • 若計數已經小於0,則直接返回false
  • 不然執行計數(AQS的state)減一
  • 若減完以後,state==0,表示沒有線程佔用鎖,即釋放成功,而後就須要喚醒被阻塞的線程了
  1. 釋放並喚醒阻塞線程 doReleaseShared
  • 若是隊列爲空,即表示沒有線程被阻塞(也就是說沒有線程調用了 CountDownLatch#wait()方法),直接退出
  • 頭結點若是爲SIGNAL, 則依次喚醒頭結點下個節點上關聯的線程,並出隊

掃描關注,java分享

https://static.oschina.net/uploads/img/201710/13203703_6IVg.jpg

相關文章
相關標籤/搜索