多線程-AQS-CyclicBarrier

一、CyclicBarrier和CountDownLatch的區別
    CountDownLatch是閉鎖,只能使用一次,而CyclicBarrier的計數器會重置,可使用屢次,因此CyclicBarrier可以處理更爲複雜的場景;java

    CyclicBarrier還提供了一些其餘有用的方法,好比getNumberWaiting()方法能夠得到CyclicBarrier阻塞的線程數量,isBroken()方法用來了解阻塞的線程是否被中斷;
            PS:有一個線程broken了,整組broken;
                CyclicBarrier是基於獨佔鎖和阻塞隊列實現的,因此併發性能在基因上就有缺陷,應對高併發場景時應謹慎考慮是否使用併發

    CountDownLatch容許一個或多個線程等待一組事件完成而繼續,而CyclicBarrier容許一個事件等待一個或多個線程完成而繼續。
    --------------------- app

二、CountDownLatch是使用AQS框架共享鎖實現的同步,隊列採用的sync同步FIFO隊列
     CyclicBarrier是使用AQS框架獨佔鎖實現的同步,隊列採用了condition阻塞block隊列--詳見AQS-condition阻塞隊列
     基於AQS框架的解讀,本次正好將AQS的阻塞隊列的模式補上。始於柵欄,始於源碼
三、源碼:框架

package com.ysma.test;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.*;

/**
 * 在最後一個線程抵達而且其餘線程也都抵達或者broken了的時候,整個阻塞就盤活了,不在阻塞
 * @since 1.5
 * @see CountDownLatch
 * @author Doug Lea  又是這哥們寫的,保留這個註釋
 */
public class CyclicBarrier {

    /**一個柵欄就是一代,Generation變化一次就表明柵欄完成了一次*/
    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    private int count;

    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // 喚醒通知上一代已經完成
        trip.signalAll();
        // 重置計數器開啓新時代
        count = parties;
        generation = new Generation();
    }

    /**
     * 設置當前代中斷,喚醒全部
     * Called only while holding lock.
     */
    private void breakBarrier() {
        generation.broken = true;//因故中斷,標識一下
        count = parties;//重置計數器,喚醒全部阻塞線程;
        trip.signalAll();//PS:並無開啓新時代!
    }

    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
            TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();//獲取或等待獲取資源,ysma-1
        try {
            final Generation g = generation;

            if (g.broken)//任一broken則break全部
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {//獲取資源後發現本身被中斷了
                breakBarrier();
                throw new InterruptedException();
            }

           int index = --count;//獲取資源,計數器減一
           if (index == 0) {  // 達到臨界點,執行barrierCommand,nextGeneration,結束=>放行全部線程
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();//不設置超時,wait,釋放cpu
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//wait指定時間
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {//發生異常,判斷本身爲第一個發起中斷者
                        breakBarrier();
                        throw ie;
                    } else {//發生異常,本身非第一個中斷者
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)//被喚醒後,檢測中斷標誌broken
                    throw new BrokenBarrierException();

                if (g != generation)//若是柵欄已經開啓了下一代,結束,放行
                    return index;

                if (timed && nanos <= 0L) {//被喚醒後,發現超時了,broken中斷
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();//解鎖,釋放資源
        }
    }

    /**構造器,略*/
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**構造器,略*/
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    /**獲取資源/線程數*/
    public int getParties() {
        return parties;
    }

    /**不限制等待*/
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }

    /**限時等待*/
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    /**
     * 查詢柵欄是否已經broken了
     * PS:重入鎖方式進入查看
     */
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    /**重置
     * 重入鎖方式進入,break柵欄,開啓新時代
     * */
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

    /**獲取還有多少資源沒有就緒*/
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}
相關文章
相關標籤/搜索