【Java併發工具類】CountDownLatch和CyclicBarrier

前言

下面介紹協調讓多線程步調一致的兩個工具類:CountDownLatchCyclicBarrierhtml

CountDownLatch和CyclicBarrier的用途介紹

CountDownLatch

// API
 void       await(); // 使當前線程在閉鎖計數器到零以前一直等待,除非線程被中斷。
 boolean    await(long timeout, TimeUnit unit); // 使當前線程在閉鎖計數器至零以前一直等待,除非線程被中斷或超出了指定的等待時間。
 void       countDown(); // 遞減閉鎖計數器,若是計數到達零,則釋放全部等待的線程。
 long       getCount(); // 返回當前計數。
 String     toString(); // 返回標識此閉鎖及其狀態的字符串。

CountDownLatch是一個同步工具類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。能夠指定計數初始化CountDownLatch,當調用countDown()方法後,在當前計數到達零以前,await()方法會一直受阻塞。計數到達零以後,全部被阻塞的線程都會被釋放,await()的全部後續調用都會當即返回。CountDownLatch的計數只能被使用一次,若是須要重複計數使用,則要考慮使用CyclicBarrierjava

CountDownLatch的用途有不少。將計數爲1初始化的CountDownLatch可用做一個簡單的開/關或入口:在經過調用countDown()的線程打開入口前,全部調用await()的線程都一直在入口出等待。而用N初始化CountDownLatch可使一個線程在N個線程完成某項操做以前一直等待,或者使其在某項操做完成N次以前一直等待。算法

COuntDownLatch的內存一致性語義:線程中調用 countDown() 以前的操做 Happens-Before緊跟在從另外一個線程中對應 await() 成功返回的操做。編程

CyclicBarrier

// API
 int        await(); // 線程將一直等待直到全部參與者都在此 barrier 上調用 await 方法
 int        await(long timeout, TimeUnit unit); // 線程將一直等待直到全部參與者都在此 barrier 上調用 await 方法, 或者超出了指定的等待時間。
 int        getNumberWaiting(); // 返回當前在屏障處等待的參與者數目。
 int        getParties(); // 返回要求啓動此 barrier 的參與者數目。
 boolean    isBroken(); // 查詢此屏障是否處於損壞狀態。
 void       reset(); // 將屏障重置爲其初始狀態。

CyclicBarrier是一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點(barrier也可被翻譯爲柵欄) (common barrier point)。 CyclicBarrier 適用於在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待的狀況。即全部線程都必須到達屏障位置後,下面的程序才能繼續執行,適於在迭代算法中使用。由於 barrier 在釋放等待線程後能夠計數器會被重置可繼續使用,因此稱它爲循環 的 barrier。api

CyclicBarrier支持一個可選的 Runnable命令(也就是能夠傳入一個線程執行其餘操做),在一組線程中的最後一個線程到達以後(但在釋放全部線程以前),該命令將只在每一個 barrier point 運行一次。這對全部參與線程繼續運行以前更新它們的共享狀態將十分有用。多線程

CyclicBarrier的內存一致性語義:線程中調用 await() 以前的操做 Happens-Before 那些是屏障操做的一部份的操做,後者依次 Happens-Before 緊跟在從另外一個線程中對應 await() 成功返回的操做。併發

Actions in a thread prior to calling await() happen-before actions that are part of the barrier action, which in turn happen-before actions following a successful return from the corresponding await() in other threads.

在對帳系統中使用CountDownLatch和CyclicBarrier

對帳系統流程圖以下:oracle

image-20200220174328582

目前對帳系統的處理流程是:先查詢訂單,而後查詢派送單,以後對比訂單和派送單,將差別寫入差別庫。對帳系統的代碼抽象後以下:app

while(存在未對帳訂單){
    // 查詢未對帳訂單
    pos = getPOrders();
    // 查詢派送單
    dos = getDOrders();
    // 執行對帳操做
    diff = check(pos, dos);
    // 差別寫入差別庫
    save(diff);
}

利用並行優化對帳系統

目前的對帳系統,因爲訂單量和派送單量巨大,因此查詢未對帳訂單getPOrder()和查詢派送單getDOrder()都相對比較慢。目前對帳系統是單線程執行的,示意圖以下(圖來自參考[1]):函數

image-20200220183528662

對於串行化的系統,優化性能首先想到的就是可否利用多線程並行處理
若是咱們能將getPOrders()和getDOrders()這兩個操做並行處理,那麼將會提高效率不少。由於這兩個操做並無前後順序的依賴,因此,咱們能夠並行處理這兩個耗時的操做。
並行後的示意圖以下(圖來自參考[1]):

image-20200220183728865

對比單線程的執行示意圖,咱們發如今同等時間裏,並行執行的吞吐量近乎單線程的2倍,優化效果仍是相對明顯的。

優化後的代碼以下:

while(存在未對帳訂單){
    // 查詢未對帳訂單
    Thread T1 = new Thread(()->{
        pos = getPOrders();
    });
    T1.start();

    // 查詢派送單
    Thread T2 = new Thread(()->{
        dos = getDOrders();
    });
    T2.start();

    // 要等待線程T1和T2執行完才能執行check()和save()這兩個操做
    // 經過調用T1.join()和T2.join()來實現等待
    // 當T2和T2線程退出時,調用T1.jion()和T2.join()的主線程就會從阻塞態被喚醒,從而執行check()和save()
    T1.join();
    T2.join();

    // 執行對帳操做
    diff = check(pos, dos);
    // 差別寫入差別庫
    save(diff);
}

使用CountDownLatch實現線程等待

上面的解決方案美中不足的地方在於:每一次while循環都會建立新的線程,而線程的建立是一個耗時操做。因此,最好能使建立出來的線程可以循環使用。一個天然而然的方案即是線程池。

// 建立 2 個線程的線程池
Executor executor =Executors.newFixedThreadPool(2);
while(存在未對帳訂單){
    // 查詢未對帳訂單
    executor.execute(()-> {
        pos = getPOrders();
    });

    // 查詢派送單
    executor.execute(()-> {
        dos = getDOrders();
    });

    /* ??如何實現等待??*/

    // 執行對帳操做
    diff = check(pos, dos);
    // 差別寫入差別庫
    save(diff);
}   

因而咱們就建立兩個固定大小爲2的線程池,以後在while循環裏重複利用。
可是問題也出來了:主線程如何得知getPOrders()和getDOrders()這兩個操做何時執完?
前面主線程經過調用線程T1和T2的join()方法來等待T1和T2退出,可是在線程池的方案裏,線程根本就不會退出,因此,join()方法不可取。

這時咱們就可使用CountDownLatch工具類,將其初始計數值設置爲2。當執行完pos = getPOrders();後,將計數器減一,執行完dos = getDOrders();後也將計數器減一。當計數器爲0時,被阻塞的主線程就能夠繼續執行了。

// 建立 2 個線程的線程池
Executor executor = Executors.newFixedThreadPool(2);

while(存在未對帳訂單){
    // 計數器初始化爲 2
    CountDownLatch latch = new CountDownLatch(2);
    // 查詢未對帳訂單
    executor.execute(()-> {
        pos = getPOrders();
        latch.countDown();    // 實現對計數器減1
    });

    // 查詢派送單
    executor.execute(()-> {
        dos = getDOrders();
        latch.countDown();    // 實現對計數器減1
    });

    // 等待兩個查詢操做結束
    latch.await(); // 在await()返回以前,主線程會一直被阻塞

    // 執行對帳操做
    diff = check(pos, dos);
    // 差別寫入差別庫
    save(diff);
}

使用CyclicBarrier進一步優化對帳系統

除了getPOrders()和getDOrders()這兩個操做能夠並行,這兩個查詢操做和check()save()這兩個對帳操做之間也能夠並行。

image-20200220185636511

兩次查詢操做和對帳操做並行,對帳操做還依賴查詢操做的結果,有點像生產者-消費者的意思,兩次查詢操做是生產者,對帳操做是消費者。那麼,咱們就須要一個隊列,來保存生產者生產的數據,而消費者則從這個隊列消費數據。

不過,針對對帳系統,能夠設計兩個隊列,而且這兩個隊列之間還有對應關係。訂單查詢操做將訂單查詢結果插入訂單隊列,派送單查詢操做將派送單插入派送單隊列,這兩個隊列的元素之間是有一一對應關係。這樣的好處在於:對帳操做能夠每次從訂單隊列出一個元素和從派送單隊列出一個元素,而後對這兩個元素執行對帳操做,這樣數據必定不會亂掉。

image-20200220190145881

如何使兩個隊列實現徹底的並行?
兩個查詢操做所需時間並不相同,那麼一個簡單的想法即是,一個線程T1執行訂單的查詢工程,一個線程T2執行派送單的查詢工做,僅當線程T1和T2各自都生產完1條數據的時候,通知線程T3執行對帳操做。

image-20200220190749565

先查詢完的一方須要在設置的屏障點等待另外一方,直到雙方都到達屏障點,纔開始繼續下一步任務。

因而咱們可使用CyclicBarrier來實現這個功能。建立一個計數器初始值爲2的CyclicBarrier,同時傳入一個回調函數,當計數器減爲0的時候,便調用這個函數。

Vector<P> pos; // 訂單隊列
Vector<D> dos; // 派送單隊列
// 執行回調的線程池 
// 固定線程數量爲1是由於只有單線程取獲取兩個隊列中的數據纔不會出現數據匹配不一致問題
Executor executor = Executors.newFixedThreadPool(1); 
// 建立CyclicBarrier的計數器爲2,傳入一個線程另外執行對帳操做
// 當計數器爲0時,會運行傳入線程執行對帳操做
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
                                    executor.execute(()->check());
                             });
void check(){
    P p = pos.remove(0); // 從訂單隊列中獲取訂單
    D d = dos.remove(0); // 從派送單隊列中獲取派送單
    // 執行對帳操做
    diff = check(p, d);
    // 差別寫入差別庫
    save(diff);
}

void checkAll(){
    // 循環查詢訂單庫
    Thread T1 = new Thread(()->{
        while(存在未對帳訂單){
            pos.add(getPOrders()); // 查詢訂單庫
            barrier.await(); // 將計數器減一併等待直到計數器爲0
        }
    });
    T1.start();  
    // 循環查詢運單庫
    Thread T2 = new Thread(()->{
        while(存在未對帳訂單){
            dos.add(getDOrders()); // 查詢運單庫
            barrier.await(); // 將計數器減一併等待直到計數器爲0
        }
    });
    T2.start();
}

線程T1負責查詢訂單,當查出一條時,調用barrier.await()來將計數器減1,同時等待計數器變爲0;線程T2負責查詢派送訂單,當查出一條時,也調用barrier.await()來將計數器減1,同時等待計數器變爲0;當T1和T2都調用barrier.await()時,計數器就會減到0,此時T1和T2就能夠執行下一條語句了,同時會調用barrier的回調函數來執行對帳操做。

CyclicBarrier的計數器有自動重置的功能,當減到0時,會自動重置你設置的初始值。因而,咱們即可以重複使用CyclicBarrier。

小結

CountDownLatchCyclicBarrier是Java併發包提供的兩個很是易用的線程同步工具類。它們的區別在於:CountDownLatch主要用來解決一個線程等待多個線程的場景(計數器一旦減到0,再有線程調用await(),該線程會直接經過,計數器不會被重置);CyclicBarrier是一組線程之間的相互等待(計數器能夠重用,減到0會重置爲設置的初始值),還能夠傳入回調函數,當計數器爲0時,執行回調函數。

參考: [1] 極客時間專欄王寶令《Java併發編程實戰》 [2] Brian Goetz.Tim Peierls. et al.Java併發編程實戰[M].北京:機械工業出版社,2016 [3] Oracle Java API.https://docs.oracle.com/javase/8/docs/api/index.html?overview-summary.html

相關文章
相關標籤/搜索