多線程學習筆記六之併發工具類CountDownLatch和CyclicBarrier

簡介

  在編寫多線程程序時,不免須要對併發流程進行控制,Thread類有join()和yield()等方法,JUC提供了更爲靈活的併發工具類,下面就學習這些工具類的用法以及實現。api

CountDownLatch

  latch意思是門閂,countdown指從上往下數,CountDownLatch容許一個或多個線程等待其餘任務線程完成操做,就像它的字面意思:從大往小數,數到某個值(0)的時候打開門閂。下面是CountDownLatch的api:多線程

//構造器
    public CountDownLatch(int count);

    //調用await()方法的線程會進入等待狀態,它會等待直到count值爲0才繼續執行
    public void await();  
    //和await()相似,只不過等待必定的時間後count值還沒變爲0的話就會繼續執行
    public boolean await(long timeout, TimeUnit unit);
    //計數器減一
    public void countDown()

能夠看到經過構造器構造一個計數器,經過調用countDown方法計數減少,await在計數器大於0時線程處於等待狀態,經過下面例子能夠學會CountDownLatch的用法:併發

示例

public class LatchTest {
    public static void main(String[] args) {
        //兩個線程,計數器傳入2
        final CountDownLatch latch = new CountDownLatch(2);

        //這兩個線程執行了latch.countDown(),計數器歸0,主線程才被喚醒繼續執行
        new Thread(() -> {
            try {
                System.out.println("子線程1: "+Thread.currentThread().getName()+"正在執行");
                Thread.sleep(3000);
                System.out.println("子線程1: "+Thread.currentThread().getName()+"執行完畢");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                System.out.println("子線程2: "+Thread.currentThread().getName()+"正在執行");
                Thread.sleep(3000);
                System.out.println("子線程2: "+Thread.currentThread().getName()+"執行完畢");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        try {
            System.out.println("等待2個子線程執行完畢...");
            latch.await();
            System.out.println("2個子線程已經執行完畢");
            System.out.println("繼續執行主線程");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

運行結果:
app

實現分析

  CountDownLatch是基於共享鎖實現的,內部類Sync繼承同步器AQS,重點分析CountDownLatch如下三個方法:ide

構造方法

  經過構造函數傳入的參數count設置同步狀態(count必須大於0,不然拋出異常),同步狀態在這裏並不表示線程得到鎖的重入次數,而是表示一個計數器,計數器的大小與任務線程的數目是一致的,函數

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    Sync(int count) {
        setState(count);
    }

await()

  調用了await的線程會處於等待狀態,直到計數器歸0纔會被喚醒。await方法調用了Sync父類AQS的acquireSharedInterruptibly方法,acquireSharedInterruptibly首先檢查線程有中斷,而後調用tryAcquireShared嘗試獲取共享鎖,獲取成功返回1,失敗返回-1,若失敗調用doAcquireSharedInterruptibly將當前線程加入同步隊列阻塞住,等待計數器爲0喚醒。工具

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

countDown()

  countDown方法將計數器減一,調用了AQS的releaseShared方法,當tryReleaseShared方法返回true執行doReleaseShared方法,這個方法在分析讀寫鎖是介紹過了,就是喚醒同步等列等待獲取鎖的線程,即喚醒調用了await方法等待計數器歸0的線程。oop

public void countDown() {
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
  • tryReleaseShared(int releases)
      經過循環+CAS的方式修改同步狀態state,當同步狀態爲0時返回true;同步狀態爲0,即表示計數器歸0,全部調用了countDown的線程都執行完了,能夠喚醒調用await等待的線程了。
protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }

CountDownLatch與Thread.join()

  Thread類的join方法與CountDownLatch做用相似,join方法的實現原理不停檢查調用join的線程是否存活,若是存活則讓當前線程處於等待狀態,當join線程終止後,會喚醒當前線程。CountDownLatch與join相比更靈活,沒必要非得線程停止只要調用了countDown方法就好了,能夠響應中斷以及可以設置超時等功能。學習

CyclicBarrier

  CyclicBarrier是指可循環使用的屏障,它可讓一組線程當他們分別達到了同步點(common barrier point)時被阻塞,直到最後一個線程到達了同步點,屏障纔會開門,讓全部被屏障屏蔽的線程繼續運行。

public class BarrierTest {
    public static void main(String[] args) {
        int size = 4;
        CyclicBarrier barrier  = new CyclicBarrier(size);
        for(int i=0;i<size;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("線程"+Thread.currentThread().getName()+" is coming...");
            try {
                //睡眠模擬業務操做
                Thread.sleep(5000);      
                System.out.println("線程"+Thread.currentThread().getName()+" is waiting on barrier");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
        }
    }
}

運行結果:

實現分析

類屬性及構造方法

public class CyclicBarrier {
    
    //CyclicBarrier使用完了能夠重置,每使用一次都會有一個新的Generation對象,broken表示當前屏障是否被損壞
    private static class Generation {
        boolean broken = false;
    }

    //重入鎖
    private final ReentrantLock lock = new ReentrantLock();
    //condition實現線程等待與喚醒
    private final Condition trip = lock.newCondition();
    //表示線程數,在parties個線程都調用await方法後,barrier纔算是被經過(tripped)了。
    private final int parties;
    //經過構造方法設置一個Runnable對象,用來在全部線程都到達barrier時執行。
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    //count表示還剩下未到達barrier(未調用await)的線程數量
    private int count;

    //構造函數
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

await()

  await重載的兩種方法都是調用的doWait方法。

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
  • doWait(boolean timed, long nanos)
      doWait是await的核心方法,經過獨佔鎖和Condition對象讓線程阻塞等待,具體先判斷當前線程是否是最後一個執行await方法的線程,若是不是,調用condition的await方法讓線程等待,在這裏咱們看到首先線程會得到鎖,進入同步塊,在循環裏讓線程等待,這裏由於當前線程得到了獨佔鎖,它處於同步隊列的head頭節點之中,當調用了condition.await()方法後,當前線程從同步隊列轉移到條件隊列,釋放了獨佔鎖,因此當前線程獲取獨佔鎖並不會影響後來的線程獲取獨佔鎖,由於當前線程進入阻塞狀態已經釋放了獨佔鎖,直到被喚醒後纔會去爭取得到獨佔鎖,到最後會在finally塊中顯示的釋放。
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        //獨佔鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //Generation對象
            final Generation g = generation;

            //屏障被破壞,拋出異常
            if (g.broken)
                throw new BrokenBarrierException();

            //線程被中斷
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            //最後一個到達同步點的線程
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            //一直循環直到最後一個線程到達同步點、屏障破損(genneration的broken屬性爲true)、中斷或超時
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //g == generation && !g.broken說明此時當前這一輪還沒結束,而且沒有其它線程執行過
                    //breakBarrier方法。這種狀況會執行breakBarrier置generation的broken標識爲true並
                    //喚醒其它線程,以後繼續拋出InterruptedException。 
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // 若是g != generation,此時這一輪已經結束,後面返回index做爲到達barrier的次序;
                        // 若是g.broken說明以前已經有其它線程執行了breakBarrier方法,後面會拋出
                        //BrokenBarrierException。
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                //超時
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
  • breakBarrier()
      損壞當前屏障,會喚醒全部在屏障中的線程,當線程被中斷或等待超時會調用
private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
  • nextGeneration()
      nextGeneration方法在全部線程進入屏障後會被調用,即生成下一個版本,全部線程又能夠從新進入到屏障中
private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

CountDownLatch和CyclicBarrier區別

  從功能上說,CountDownLatch容許一個或多個線程等待其餘線程完成操做,而CyclicBarrier是讓一組線程達到一個公共同步點以後再一塊兒放行;CountDownLatch計數器只能使用一次,CyclicBarrier可使用reset方法重置用以處理某些複雜的業務場景。

相關文章
相關標籤/搜索