Semaphore CountDownLatch CyclicBarrier 源碼分析

java5 中 ,提供了幾個併發工具類 ,Semaphore CountDownLatch CyclicBarrier,在併發編程中很是實用。前二者經過  內部類sync 繼承AQS,使用共享資源的模式,AQS的實現可參考個人另外一篇  AQS 實現分析,前二者根據各自功能需求 , 各自內部實現tryAcquireShared(獲取資源)、tryReleaseShared(釋放)。來定義什麼條件 下來獲取與釋放。而CyclicBarrier內部經過Reentrantlock與Condition組合的方式實現。java

Semaphore

與Reentrantlock相似,也有公平和非公平的機制。這裏就不在分析了,默認是非公平的。編程

經過acquire 獲取併發

Semphore
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);   //非獨佔模式
}
AQS
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)           //當返回值不小於0時,得到資源。
        doAcquireSharedInterruptibly(arg);  //資源獲取失敗,加入隊列尾部,阻塞
}

從AQS的分析中獲得,當tryAcquireShared 返回值小於0,那麼認爲獲取失敗。而對於tryAcquiredShared的實現中,讓其執行時,其返回值需大於0,對於Semaphore信號量,構造時會設置許可的大小 。表示可以獲取的資源,通常大於1。app

默認是非公平 的,然後會調用以下方法,會返回工具

Semaphore NonFairSync
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

減去本次嘗試的獲取的許可數,當結果小於0,直接返回,而若是不小於0 , cas更新許可,若是更新失敗,代表其它線程也在更新,然後 進入下一次循環,直到可用的小於0或者cas成功。當返回值小於0時,阻塞,等待喚醒資源釋放。返回值大於 0,獲取許可成功,繼續執行。oop

釋放ui

當獲取許可的線程執行完時,必須釋放佔有的許可量,this

Semaphore
public void release() {
    sync.releaseShared(1); //釋放時需CAS更新,獨佔模式不需cas
}
AQS
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();  //喚醒head的後繼節點
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))   //由於是 共享鎖,可能有多個線程釋放
            return true;
    }
}

示例:spa

項目中的示例:在開發的項目中,一個數據中心有不少個集羣,集羣中有不少個虛擬機實例,同一個集羣中有多個虛擬機要開機,可是開機前須要使用調度策略,須要更新集羣中的信息,可是隻容許一個集羣只有一個虛擬機能執行策略。這裏就能夠經過Semaphore實現。.net

clusterLockMap.putIfAbsent(cluster.getId(), new Semaphore(1)); //爲每一個集羣初始一個許可

clusterLockMap.get(cluster.getId()).acquire();  //嘗試獲取許可

//執行邏輯

//釋放資源,這裏調用drainPermits的緣由是,在釋放前清空許可,
由於其它 在阻塞的線程若是被中斷了,會將許可值擴大,因此在釋放前,將其清空。

   finally {
            //保證了在任一時刻,只有一個虛擬機能調用策略
        synchronized (clusterLockMap.get(cluster.getId())) {
            clusterLockMap.get(cluster.getId()).drainPermits();    //清空,返回清空數,爲空時直接返回0
            clusterLockMap.get(cluster.getId()).release();
         }
     }

上面的示例中若是不加鎖,異常發生時可能致使許可大於一。

CountDownLatch

相似於計數器,好比經常使用於  一個或幾個線程,要等待其它的線程執行完才能繼續執行。內部只有一個繼承了AQS的sync。只能使用一次。

經常使用方法 await   ,用來等待執行信號。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);  //狀態不爲1時,阻塞。等待被喚醒
}
boolean await(long timeout, TimeUnit unit)  超時尚未爲0,返回false

 

CountDownLatch sync  
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;  
}

判斷當前是否等於0 ,也就是說 countDown 的調用次數是否等於初始化的數量。

countDown  信號的釋放

CountDownLatch 
public void countDown() {
    sync.releaseShared(1);
}
CountDownLatch sync 
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

p實例:  

F1  比賽前須要等待發車信號,全部的車才能出發。  CountDownLatch(1)    ,車比如線程,發車前await。

信號發出,countDown。

F1車進入修息區,必須 全部的檢測 燈都爲綠才能出發。車出發前 await,每個必檢項檢測完調用countDown,才能出發(線程才能運行)。

CyclicBarrier

CyclicBarrier 最大的特別 之處就是,在構造時能夠指定一個線程,而且能夠重複使用。

final Runnable barrierCommand;

該線程的做用在於,當調用await時,且許可爲0 時,執行完 barrierCommand的線程  才能繼續執行。

實現細節,類的結構圖以下:

構造方法時,能夠指定barrierCommand,若是不指定,則功能相似於CountDownLatch,也有點計數的意思 。其從成員變量 能夠看出,內部有ReentrantLock的屬性,內部多是經過ReentrantLock的調用實現。接下來分析源碼。

分析源碼前,幾個關鍵變量注意下,

lock 同步鎖,

trip = lock.newCondition()

count  計數器

generation   內部實現的類,用來表示是否新的計數的開始

構造時指定count,線程調用await,count減1 阻塞,直到count等於0,且barrierCommand執行完若是有的話。

調用方法有兩種:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

內部實現

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        if (g.broken)
            throw new BrokenBarrierException();
        if (Thread.interrupted()) {
            breakBarrier(); //被中斷,本次計數無效,並將標誌置爲broken
            throw new InterruptedException();
        }
        int index = --count; // 計數減一 
        if (index == 0) {  // tripped    
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();  //計數爲0時,執行run方法
                ranAction = true;
                nextGeneration(); //本次計數結束,trip.singnalAll(),計數重置。
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();  //出現異常,本次計數無效。喚醒其它await線程,
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();  // 阻塞
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos); // 限時阻塞,調用await(long timeout, TimeUnit unit),
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)  //  是否仍是同一個計數週期內
                return index; // 順利執行,返回

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();  //只有調用超時await方法,且超時,拋出。
            }
        }
    } finally {
        lock.unlock();
    }
}

generation  的操做   

順利執行後,當計數爲0,且週期重置。

private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

執行時出現異常或者等待超時,喚醒。計數無效,使喚醒的線程檢測到generation的broken 標識爲true,拋出BrokenBarrierException 。

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

 

總結:
 

從這上面的分析能夠得出如下狀況

大部分的併發工具類,是經過構造內部類繼承AQS,並根據工具類設計的功能,實現對應的獲取與釋放資源的方法。來使得 調用線程什麼時候阻塞,什麼時候釋放。如CountDownLatch 就不一樣於Semaphore的設計思路,Semaphore是先嚐試獲取資源,獲取到才能釋放,獲取不到,阻塞。雖然實現方式(繼承AQS)同樣。而CountDownLatch  釋放資源 前沒有經過相應的方法獲取,而是直接將當前的許可數減一,實現方法 tryReleaseShared ,然後根據是否計數爲0決定是否喚醒線程。而對應的獲取資源方法 tryAcquireShared,判斷計數器是否已經到達0來判斷是否執行。或者阻塞。獲取後不須要釋放。

而CyclicBarrier 並無直接經過內部sync繼承AQS的方式,而是經過現有的工具類Reentantlock,與Condition組合來實現功能,且還能重用。

相關文章
相關標籤/搜索