java併發編程 -CountDownLatch和CyclicBarrier在內部實現和場景上的區別

前言java

CountDownLatch和CyclicBarrier兩個同爲java併發編程的重要工具類,它們在諸多多線程併發或並行場景中獲得了普遍的應用。但二者就其內部實現和使用場景而言是各有所側重的。編程


內部實現差別安全

前者更多依賴經典的AQS機制和CAS機制來控制器內部狀態的更迭和計數器自己的變化,然後者更多依靠可重入Lock等機制來控制其內部併發安全性和一致性。多線程

 public class  {
     //Synchronization control For CountDownLatch.
     //Uses AQS state to represent count.
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;
    ... ...//
 }
 public class CyclicBarrier {
    /**
     * Each use of the barrier is represented as a generation instance.
     * The generation changes whenever the barrier is tripped, or
     * is reset. There can be many generations associated with threads
     * using the barrier - due to the non-deterministic way the lock
     * may be allocated to waiting threads - but only one of these
     * can be active at a time (the one to which {@code count} applies)
     * and all the rest are either broken or tripped.
     * There need not be an active generation if there has been a break
     * but no subsequent reset.
     */
    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    /**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     */
    private int count;

    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

    /**
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    ... ... //
 }


實戰 - 展現各自的使用場景併發


/**
 *類說明:共5個初始化子線程,6個閉鎖釦除點,扣除完畢後,主線程和業務線程才能繼續執行
 */
public class UseCountDownLatch {
   
    static CountDownLatch latch = new CountDownLatch(6);

    /*初始化線程*/
    private static class InitThread implements Runnable{

        public void run() {
           System.out.println("Thread_"+Thread.currentThread().getId()
                 +" ready init work......");
            latch.countDown();
            for(int i =0;i<2;i++) {
               System.out.println("Thread_"+Thread.currentThread().getId()
                     +" ........continue do its work");
            }
        }
    }

    /*業務線程等待latch的計數器爲0完成*/
    private static class BusiThread implements Runnable{

        public void run() {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for(int i =0;i<3;i++) {
               System.out.println("BusiThread_"+Thread.currentThread().getId()
                     +" do business-----");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            public void run() {
               SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId()
                     +" ready init work step 1st......");
                latch.countDown();
                System.out.println("begin step 2nd.......");
                SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId()
                     +" ready init work step 2nd......");
                latch.countDown();
            }
        }).start();
        new Thread(new BusiThread()).start();
        for(int i=0;i<=3;i++){
            Thread thread = new Thread(new InitThread());
            thread.start();
        }
        latch.await();
        System.out.println("Main do ites work........");
    }
}
/**
 *類說明:共4個子線程,他們所有完成工做後,交出本身結果,
 *再被統一釋放去作本身的事情,而交出的結果被另外的線程拿來拼接字符串
 */
class UseCyclicBarrier {
    private static CyclicBarrier barrier
            = new CyclicBarrier(4,new CollectThread());

    //存放子線程工做結果的容器
    private static ConcurrentHashMap<String,Long> resultMap
            = new ConcurrentHashMap<String,Long>();

    public static void main(String[] args) {
        for(int i=0;i<4;i++){
            Thread thread = new Thread(new SubThread());
            thread.start();
        }

    }

    /*彙總的任務*/
    private static class CollectThread implements Runnable{

        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
               result.append("["+workResult.getValue()+"]");
            }
            System.out.println(" the result = "+ result);
            System.out.println("do other business........");
        }
    }

    /*相互等待的子線程*/
    private static class SubThread implements Runnable{
        @Override
        public void run() {
           long id = Thread.currentThread().getId();
            resultMap.put(Thread.currentThread().getId()+"",id);
            try {
                   Thread.sleep(1000+id);
                   System.out.println("Thread_"+id+" ....do something ");
                barrier.await();
               Thread.sleep(1000+id);
                System.out.println("Thread_"+id+" ....do its business ");
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}


 二者總結app

1. Cyclicbarrier結果彙總的Runable線程能夠重複被執行,經過屢次觸發await()方法,countdownlatch能夠調用await()方法屢次;cyclicbarrier若沒有結果彙總,則調用一次await()就夠了;框架

2. New cyclicbarrier(threadCount)的線程數必須與實際的用戶線程數一致;ide

3. 協調線程同時運行:countDownLatch協調工做線程執行,是由外面線程協調;cyclicbarrier是由工做線程之間相互協調運行;函數

4. 從構造函數上看出:countDownlatch控制運行的計數器數量和線程數沒有關係;cyclicbarrier構造中傳入的線程數等於實際執行線程數;工具

5. countDownLatch在不能基於執行子線程的運行結果作處理,而cyclicbarrier能夠;

6.     就使用場景而言,countdownlatch 更適用於框架加載前的一系列初始化工做等場景; cyclicbarrier更適用於須要多個用戶線程執行後,將運行結果彙總再計算等典型場景;

相關文章
相關標籤/搜索