Java併發編程筆記之Semaphore信號量源碼分析

JUC 中 Semaphore 的使用與原理分析,Semaphore 也是 Java 中的一個同步器,與 CountDownLatch 和 CycleBarrier 不一樣在於它內部的計數器是遞增的,那麼,Semaphore 的內部實現是怎樣的呢?java

  Semaphore 信號量也是Java 中一個同步容器,與CountDownLatch 和 CyclicBarrier 不一樣之處在於它內部的計數器是遞增的。爲了可以一覽Semaphore的內部結構,咱們首先要看一下Semaphore的類圖,類圖,以下所示:算法

 

 如上類圖能夠知道Semaphoren內部仍是使用AQS來實現的,Sync只是對AQS的一個修飾,而且Sync有兩個實現類,分別表明獲取信號量的時候是否採起公平策略。建立Semaphore的時候會有一個變量標示是否使用公平策略,源碼以下:函數

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new       
        NonfairSync(permits);
    }

   Sync(int permits) {
       setState(permits);
   }

如上面代碼所示,Semaphore默認使用的是非公平策略,若是你須要公平策略,則可使用帶兩個參數的構造函數來構造Semaphore對象,另外和CountDownLatch同樣,構造函數裏面傳遞的初始化信號量個數 permits 被賦值給了AQS 的state狀態變量,也就是說這裏AQS的state值表示當前持有的信號量個數。ui

 

接下來咱們主要看看Semaphore實現的主要方法的源碼,以下:spa

  1.void acquire() 當前線程調用該方法的時候,目的是但願獲取一個信號量資源,若是當前信號量計數個數大於 0 ,而且當前線程獲取到了一個信號量則該方法直接返回,當前信號量的計數會減小 1 。不然會被放入AQS的阻塞隊列,當前線程被掛起,直到其餘線程調用了release方法釋放了信號量,而且當前線程經過競爭獲取到了改信號量。當前線程被其餘線程調用了 interrupte()方法中斷後,當前線程會拋出 InterruptedException異常返回。源碼以下:線程

   public void acquire() throws InterruptedException {
        //傳遞參數爲1,說明要獲取1個信號量資源
        sync.acquireSharedInterruptibly(1);
   }
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {

        //(1)若是線程被中斷,則拋出中斷異常
        if (Thread.interrupted())
            throw new InterruptedException();

        //(2)否者調用sync子類方法嘗試獲取,這裏根據構造函數肯定使用公平策略
        if (tryAcquireShared(arg) < 0)
            //若是獲取失敗則放入阻塞隊列,而後再次嘗試若是失敗則調用park方法掛起當前線程
        doAcquireSharedInterruptibly(arg);
    }

如上代碼可知,acquire()內部調用了sync的acquireSharedInterruptibly  方法,後者是對中斷響應的(若是當前線程被中斷,則拋出中斷異常),嘗試獲取信號量資源的AQS的方法tryAcquireShared 是由 sync 的子類實現,因此這裏就要分公平性了,這裏先討論非公平策略 NonfairSync 類的 tryAcquireShared 方法,源碼以下:code

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
} final
int nonfairTryAcquireShared(int acquires) { for (;;) { //獲取當前信號量值 int available = getState(); //計算當前剩餘值 int remaining = available - acquires; //若是當前剩餘小於0或者CAS設置成功則返回 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }

如上代碼,先計算當前信號量值(available)減去須要獲取的值(acquires) 獲得剩餘的信號量個數(remaining),若是剩餘值小於 0 說明當前信號量個數知足不了需求,則直接返回負數,而後當前線程會被放入AQS的阻塞隊列,當前線程被掛起。若是剩餘值大於 0 則使用CAS操做設置當前信號量值爲剩餘值,而後返回剩餘值。另外能夠知道NonFairSync是非公平性獲取的,是說先調用aquire方法獲取信號量的線程不必定比後來者先獲取鎖。對象

 

接下來咱們要看看公平性的FairSync 類是如何保證公平性的,源碼以下:blog

 protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
 }

能夠知道公平性仍是靠 hasQueuedPredecessors 這個方法來作的,之前的隨筆已經講過公平性是看當前線程節點是否有前驅節點也在等待獲取該資源,若是是則本身放棄獲取的權力,而後當前線程會被放入AQS阻塞隊列,不然就去獲取。hasQueuedPredecessors源碼以下:隊列

public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

如上面代碼所示,若是當前線程節點有前驅節點則返回true,不然若是當前AQS隊列爲空 或者 當前線程節點是AQS的第一個節點則返回 false ,其中,若是 h == t 則說明當前隊列爲空則直接返回 false,若是 h !=t 而且 s == null 說明有一個元素將要做爲AQS的第一個節點入隊列(回顧下 enq 函數第一個元素入隊列是兩步操做,首先建立一個哨兵頭節點,而後第一個元素插入到哨兵節點後面),那麼返回 true,若是  h !=t 而且 s != null 而且  s.thread != Thread.currentThread() 則說明隊列裏面的第一個元素不是當前線程則返回 true。

 

  2.void acquire(int permits) 該方法與 acquire() 不一樣在與後者只須要獲取一個信號量值,而前者則獲取指定 permits 個,源碼以下:

public void acquire(int permits) throws InterruptedException {
        if (permits < 0) 
       throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }

 

  3.void acquireUninterruptibly() 該方法與 acquire() 相似,不一樣之處在於該方法對中斷不響應,也就是噹噹前線程調用了 acquireUninterruptibly 獲取資源過程當中(包含被阻塞後)其它線程調用了當前線程的 interrupt()方法設置了當前線程的中斷標誌當前線程並不會拋出 InterruptedException 異常而返回。源碼以下:

public void acquireUninterruptibly() {
     sync.acquireShared(1);
}

 

  4.void acquireUninterruptibly(int permits) 該方法與 acquire(int permits) 不一樣在於該方法對中斷不響應。源碼如以下:

 public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
 }

 

  5.void release() 該方法做用是把當前 semaphore對象的信號量值增長 1 ,若是當前有線程由於調用 acquire 方法被阻塞放入了 AQS的阻塞隊列,則會根據公平策略選擇一個線程進行激活,激活的線程會嘗試獲取剛增長的信號量,源碼以下:

  public void release() {
        //(1)arg=1
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {

        //(2)嘗試釋放資源
        if (tryReleaseShared(arg)) {

            //(3)資源釋放成功則調用park喚醒AQS隊列裏面最早掛起的線程
            doReleaseShared();
            return true;
        }
        return false;
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {

            //(4)獲取當前信號量值
            int current = getState();

            //(5)當前信號量值增長releases,這裏爲增長1
            int next = current + releases;
            if (next < current) // 移除處理
                throw new Error("Maximum permit count exceeded");

            //(6)使用cas保證更新信號量值的原子性
            if (compareAndSetState(current, next))
                return true;
        }
    }

如上面代碼能夠看到 release()方法中對 sync.releaseShared(1),能夠知道release方法每次只會對信號量值增長 1 ,tryReleaseShared方法是無限循環,使用CAS保證了 release 方法對信號量遞增 1 的原子性操做。當tryReleaseShared 方法增長信號量成功後會執行代碼(3),調用AQS的方法來激活由於調用acquire方法而被阻塞的線程。

 

  6.void release(int permits) 該方法與不帶參數的不一樣之處在於前者每次調用會在信號量值原來基礎上增長 permits,然後者每次增長 1。源碼以下:

public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
}

另外注意到這裏調用的是 sync.releaseShared 是共享方法,這說明該信號量是線程共享的,信號量沒有和固定線程綁定,多個線程能夠同時使用CAS去更新信號量的值而不會阻塞。

 

到目前已經知道了其原理,接下來用一個例子來加深對Semaphore的理解,例子以下:

package com.hjc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Created by cong on 2018/7/8.
 */
public class SemaphoreTest {

    // 建立一個Semaphore實例
    private static volatile Semaphore semaphore = new Semaphore(0);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 加入線程A到線程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() +  " over");
                    semaphore.release();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 加入線程B到線程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() +  " over");
                    semaphore.release();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 等待子線程執行完畢,返回
        semaphore.acquire(2);
        System.out.println("all child thread over!");

        //關閉線程池
        executorService.shutdown();
    }
}

運行結果以下:

相似於 CountDownLatch,上面咱們的例子也是在主線程中開啓兩個子線程進行執行,等全部子線程執行完畢後主線程在繼續向下運行。

如上代碼首先首先建立了一個信號量實例,構造函數的入參爲 0,說明當前信號量計數器爲 0,而後 main 函數添加兩個線程任務到線程池,每一個線程內部調用了信號量的 release 方法,至關於計數值遞增一,最後在 main 線程裏面調用信號量的 acquire 方法,參數傳遞爲 2 說明調用 acquire 方法的線程會一直阻塞,直到信號量的計數變爲 2 時纔會返回。

看到這裏也就明白了,若是構造 Semaphore 時候傳遞的參數爲 N,在 M 個線程中調用了該信號量的 release 方法,那麼在調用 acquire 對 M 個線程進行同步時候傳遞的參數應該是 M+N;

 

對CountDownLatch,CyclicBarrier,Semaphored這三者之間的比較總結:

  1.CountDownLatch 經過計數器提供了更靈活的控制,只要檢測到計數器爲 0,而無論當前線程是否結束調用 await 的線程就能夠往下執行,相比使用 jion 必須等待線程執行完畢後主線程纔會繼續向下運行更靈活。

  2.CyclicBarrier 也能夠達到 CountDownLatch 的效果,可是後者當計數器變爲 0 後,就不能在被複用,而前者則使用 reset 方法能夠重置後複用,前者對同一個算法可是輸入參數不一樣的相似場景下比較適用。

  3.而 semaphore 採用了信號量遞增的策略,一開始並不須要關心須要同步的線程個數,等調用 aquire 時候在指定須要同步個數,而且提供了獲取信號量的公平性策略。

相關文章
相關標籤/搜索