併發工具類(一)等待多線程的CountDownLatch

前言

  JDK中爲了處理線程之間的同步問題,除了提供鎖機制以外,還提供了幾個很是有用的併發工具類:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
  CountDownLatch、CyclicBarrier、Semphore、Phaser 這四個工具類提供一種併發流程的控制手段;而Exchanger工具類則提供了在線程之間交換數據的一種手段。html

簡介

  CountDownLatch 容許一個或多個線程等待其餘線程完成操做。單詞Latch的意思是「門閂」,因此沒有打開時,N我的是不能進入屋內的,也就是N個線程是不能往下執行的,從而控制線程執行任務的時機,使線程以「組團」的方式一塊兒執行任務。
  CountDownLatch 類 在建立時,給定一個計數count。線程調用CountDownLatch 對象的awiat( )方法時,判斷這個計數count是否爲0,若是不爲0,就進入等待狀態。其餘線程在完成必定任務時,調用CountDownLatch 的countDown()方法,使計數count減一。直到count的值等於0或者少於0時,即是等待線程的運行時機,將會繼續往下運行。java

CountDownLatch的API接口bash

方法名稱 描 述
void await() 使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷。
boolean await(long timeout, TimeUnit unit) 使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷或超出了指定的等待時間。
void countDown() 遞減鎖存器的計數,若是計數到達零,則釋放全部等待的線程。
long getCount() 返回當前計數。
String toString() 返回標識此鎖存器及其狀態的字符串。

注意:
await()也能夠被多個線程同時調用,從而實現多個線程 等待其餘的多個線程完成某部分操做。

下面是API文檔介紹的兩個經典用法:

@ Example1

多線程

Driver類中建立了一組worker 線程,全部的worker線程必須等待Driver類完成初始化動做,才能往下運行。完成初始化動做後,Driver類也必須等待全部worker線程完成才能結束。本例子中使用了兩個CountDownLatch類:併發

  • startSignal是一個啓動信號,在 driver 爲繼續執行 worker 作好準備以前,它會阻止全部的 worker 繼續執行。
  • doneSignal是一個完成信號,它容許 driver 在完成全部 worker 以前一直等待。

class Driver { // ...
   void main() throws InterruptedException {
     CountDownLatch startSignal = new CountDownLatch(1);
     CountDownLatch doneSignal = new CountDownLatch(N);

     for (int i = 0; i < N; ++i) // create and start threads
       new Thread(new Worker(startSignal, doneSignal)).start();

     doSomethingElse();            // don't let run yet
     startSignal.countDown();      // let all threads proceed
     doSomethingElse();
     doneSignal.await();           // wait for all to finish
   }
 }

 class Worker implements Runnable {
   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;
   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
      this.startSignal = startSignal;
      this.doneSignal = doneSignal;
   }
   public void run() {
      try {
        startSignal.await();
        doWork();
        doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }
複製代碼

@ Example2

:另外一種典型用法是,將一個問題分紅 N 個部分(N個小任務),而後將這些任務Runnable交由線程池來完成,每一個子任務執行完成,就計數一次,主線程則等待這些子任務完成。當全部的子部分完成後,主線程就可以經過 await。(當線程必須用這種方法反覆倒計數時,可改成使用 CyclicBarrier。)
app

class Driver2 { // ...
   void main() throws InterruptedException {
     CountDownLatch doneSignal = new CountDownLatch(N);
     Executor e = ...

     for (int i = 0; i < N; ++i) // create and start threads
       e.execute(new WorkerRunnable(doneSignal, i));

     doneSignal.await();           // wait for all to finish
   }
 }

 class WorkerRunnable implements Runnable {
   private final CountDownLatch doneSignal;
   private final int i;
   WorkerRunnable(CountDownLatch doneSignal, int i) {
      this.doneSignal = doneSignal;
      this.i = i;
   }
   public void run() {
      try {
        doWork(i);
        doneSignal.countDown();
      } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }
複製代碼

應用場景

  假若有這樣一個需求,當咱們須要解析一個Excel裏多個sheet的數據時,能夠考慮使用多線程,每一個線程解析一個sheet裏的數據,等到全部的sheet都解析完以後,程序須要提示解析完成。在這個需求中,要實現主線程等待全部線程完成sheet的解析操做,最簡單的作法是使用join。代碼以下:框架

public class JoinCountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        Thread parser1 = new Thread(new Runnable() {
            @Override
            public void run() {
            }
        });

        Thread parser2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("parser2 finish");
            }
        });

        parser1.start();
        parser2.start();
        parser1.join();
        parser2.join();
        System.out.println("all parser finish");
    }

}
複製代碼

join用於讓當前執行線程等待join線程執行結束。其實現原理是不停檢查join線程是否存活,若是join線程存活則讓當前線程永遠wait,代碼片斷以下,wait(0)表示永遠等待下去。
ide

while (isAlive()) {
 wait(0);
}
複製代碼

直到join線程停止後,線程的this.notifyAll會被調用,調用notifyAll是在JVM裏實現的,因此JDK裏看不到,有興趣的同窗能夠看看JVM源碼。JDK不推薦在線程實例上使用wait,notify和notifyAll方法。
而在JDK1.5以後的併發包中提供的函數

CountDownLatch也能夠實現join的這個功能,而且比join的功能更多。

public class CountDownLatchTest {

    static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(1);
                c.countDown();
                System.out.println(2);
                c.countDown();
            }
        }).start();

        c.await();
        System.out.println("3");
    }

}
複製代碼

CountDownLatch的構造函數接收一個int類型的參數做爲計數器,若是你想等待N個點完成,這裏就傳入N。工具

當咱們調用一次CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await會阻塞當前線程,直到N變成零。

因爲countDown方法能夠用在任何地方,因此這裏說的N個點,能夠是N個線程,也能夠是1個線程裏的N個執行步驟

。用在多個線程時,你只須要把這個CountDownLatch的引用傳遞到線程裏。

其餘方法:

若是有某個解析sheet的線程處理的比較慢,咱們不可能讓主線程一直等待,因此咱們可使用另一個帶指定時間的await方法,await(long time, TimeUnit unit): 這個方法等待特定時間後,就會再也不阻塞當前線程。join也有相似的方法。

注意:

  • 計數器必須大於等於0,只是等於0時候,計數器就是零,調用await方法時不會阻塞當前線程。CountDownLatch不可能從新初始化或者修改CountDownLatch對象的內部計數器的值。
  • 一個線程調用countDown方法 happen-before 另一個線程調用await方法。

CountDownLatch 的源碼分析

最後,咱們簡單看一下 CountDownLatch是怎麼實現的:

public class CountDownLatch {
 private final Sync sync;

 public CountDownLatch(int count) {//構造器
         //count少於0將拋出異常
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

 public void countDown() {
        sync.releaseShared(1);
    }
//........
}
複製代碼

在建立countDownLatch,其構造器裏面建立了一個sync類,而且await()countDown方法都是都是經過此類來實現的。

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
          //設置state的值爲countDownLatch的計數的數目
            setState(count);
        }

        int getCount() {
            return getState();
        }

        //若是state值爲0.也就是計數完成了,就不能夠再獲取共享鎖,這也是爲何CountLatch只能用一次
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
       
       //是否能夠釋放共享鎖
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1; //狀態state減一
                if (compareAndSetState(c, nextc))
                    return nextc == 0;//計數到0了,表示釋放鎖成功。
            }
        }
    }
複製代碼

與大部分的併發工具類同樣,都是繼承使用了JDK提供的強大的AQS框架類AbstractQueuedSynchronizer,並且使用的仍是共享鎖,共享鎖能容許線程進入的線程數目,就是CountDownLatch傳入的參數。


文章源地址:https://www.cnblogs.com/jinggod/p/8492067.html

相關文章
相關標籤/搜索