Java多線程-併發工具類Semaphore詳解

簡介

Semaphore是一種同步輔助工具,翻譯過來就是信號量,用來實現流量控制,它能夠控制同一時間內對資源的訪問次數.bash

不管是Synchroniezd仍是ReentrantLock,一次都只容許一個線程訪問一個資源,可是Semaphore能夠指定多個線程同時訪問某一個資源.併發

Semaphore有一個構造函數,能夠傳入一個int型整數n,表示某段代碼最多隻有n個線程能夠訪問,若是超出了n,那麼請等待,等到某個線程執行完畢這段代碼塊,下一個線程再進入。dom

信號量上定義兩種操做:ide

  • acquire(獲取):當一個線程調用acquire操做時,它要麼成功獲取到信號量(信號量減1),要麼一直等下去,直到有線程釋放信號量,或超時,Semaphore內部會維護一個等待隊列用於存儲這些被暫停的線程.
  • release(釋放)實際上會將信號量的值+1,而後喚醒相應Sepmaphore實例的等待隊列中的一個任意等待線程.

應用場景

信號量主要用於兩個目的:函數

  • 用於多個共享資源的互斥使用.
  • 用於併發線程數的控制.

例子

如下的例子:5個線程搶3個車位,同時最多隻有3個線程能搶到車位,等其餘線程釋放信號量後,才能搶到車位.工具

public static void main(String[] args) {
		Semaphore semaphore = new Semaphore(3);

		for (int i = 0; i < 5; i++) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						semaphore.acquire();//申請資源
						System.out.println(Thread.currentThread().getName()+"搶到車位");
						ThreadUtil.sleep(RandomUtil.randomInt(1000,5000));
						System.out.println(Thread.currentThread().getName()+"歸還車位");
					} catch (InterruptedException e) {
						e.printStackTrace();
					}finally {
					    //釋放資源
						semaphore.release();
					}

				}
			},"線程"+i).start();
		}
	}
複製代碼

注意事項

  • Semaphore.acquire()和Semaphore.release()老是配對使用的,這點須要由應用代碼自身保證.
  • Semaphore.release()調用應該放在finally塊中,已避免應用代碼出現異常的狀況下,當前線程所得到的信號量沒法返還.
  • 若是Semaphore構造器中的參數permits值設置爲1,所建立的Semaphore至關於一個互斥鎖.與其餘互斥鎖不一樣的是,這種互斥鎖容許一個線程釋放另一個線程所持有的鎖.由於一個線程能夠在未執行過Semaphore.acquire()的狀況下執行相應的Semaphore.release().
  • 默認狀況下,Semaphore採用的是非公平性調度策略.

原理

abstract static class Sync extends AbstractQueuedSynchronizer {
    //省略
 }
複製代碼

Semaphore內部使用Sync類,Sync又是繼承AbstractQueuedSynchronizer,因此Sync底層仍是使用AQS實現的.Sync有兩個實現類NonfairSync和FairSync,用來指定獲取信號量時是否採用公平策略.ui

初始化方法

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}


public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Sync(int permits) {
    setState(permits);
}
複製代碼

如上所示,Semaphore默認採用非公平策略,若是須要使用公平策略則可使用帶兩個參數的構造函數來構造Semaphore對象。spa

參數permits被傳遞給AQS的state值,用來表示當前持有的信號量個數.線程

void acquire()方法

當前線程調用該方法的目的是但願獲取一個信號量資源。翻譯

若是當前信號量個數大於0,則當前信號量的計數會減1,而後該方法直接返回。不然若是當前信號量個數等0,則當前線程會被放入AQS的阻塞隊列。當其餘線程調用了當前線程的interrupt()方法中斷了當前線程時,則當前線程會拋出InterruptedException異常返回。

//Semaphore方法
public void acquire() throws InterruptedException {
    //傳遞參數爲1,說明要獲取1個信號量資源
    sync.acquireSharedInterruptibly(1);
}

//AQS的方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //(1)若是線程被中斷,則拋出中斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
    //(2)不然調用Sync子類方法嘗試獲取,這裏根據構造函數肯定使用公平策略
    if (tryAcquireShared(arg) < 0)
        //若是獲取失敗則放入阻塞隊列.而後再次嘗試,若是使用則調用park方法掛起當前線程
        doAcquireSharedInterruptibly(arg);
}
複製代碼

由如上代碼可知,acquire()在內部調用了Sync的acquireSharedlnterruptibly方法,後者會對中斷進行響應(若是當前線程被中斷,則拋出中斷異常)。嘗試獲取信號量資源的AQS的方法 tryAcquireShared是由Sync的子類實現的,因此這裏分別從兩 方面來討論。

先討論非公平策略NonfairSync類的tryAcquireShared方法,代碼以下:

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        //獲取當前信號量值
        int available = getState();
        //計算當前剩餘值
        int remaining = available - acquires;
        //若是當前剩餘值小於0或則CAS設置成功則返回
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
複製代碼

如上代碼先獲取當前信號量值(available),而後減去須要獲取的值(acquires),獲得剩餘的信號量個數(remaining),若是剩餘值小於0則說明當前信號量個數知足不了需求,那麼直接返回負數,這時當前線程會被放入AQS的阻塞隊列而被掛起。若是剩餘值大於0,則使用CAS操做設置當前信號量值爲剩餘值,而後返回剩餘值。

另外,因爲NonFairSync是非公平獲取的,也就是說先調用aquire方法獲取信號量的線程不必定比後來者先獲取到信號量。

考慮下面場景,若是線程A先調用了aquire()方法獲取信號量,可是當前信號量個數爲0,那麼線程A會被放入AQS的阻塞隊列 。過一段時間後線程C調用了release()方法釋放了一個信號量,若是當前沒有其餘線程獲取信號量,那麼線程A就會被激活,而後獲取該信號量,可是假如線程C釋放信號量後,線程C調用了aquire方法,那麼線程C就會和線程A去競爭這個信號量資源 。 若是採用非公平策略,由nonfairTryAcquireShared的代碼可知,線程C徹底能夠在線程A被激活前,或者激活後先於線程 A獲取到該信號量,也就是在這種模式下阻塞線程和當前請求的線程是競爭關係,而不遵循先來先得的策略。

下面看公平性的FairSync類是如何保證公平性的。

protected int tryAcquireShared(int acquires) {
    for (;;) {
        //查詢是否當前線程節點的前驅節點也在等待獲取該資源,有的話直接返回
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
複製代碼

可見公平性仍是靠hasQueuedPredecessors這個函數來保證的。因此Semaphore的公平策略是看當前線程節點的前驅節點是否也在等待獲取該資源,若是是則本身放棄獲取的權限,而後當前線程會被放入AQS阻塞隊列,不然就去獲取。

void acquire(int permits)方法

該方法與acquire()方法不一樣,後者只須要獲取一個信號量值, 而前者則獲取permits個。

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
複製代碼

void acquireUninterruptibly()方法

該方法與acquire()相似,不一樣之處在於該方法對中斷不響應,也就是噹噹前線程調用了 acquireUninterruptibly獲取資源時(包含被阻塞後),其餘線程調用了當前線程的interrupt() 方法設置了當前線程的中斷標誌,此時當前線程並不會拋出IllegalArgumentException異常而返回。

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}
複製代碼

void release()方法

該方法的做用是把當前Semaphore對象的信號量值增長1,若是當前有線程由於調用aquire方法被阻塞而被放入了AQS的阻塞 隊列,則會根據公平策略選擇一個信號量個數能被知足的線程進行激活, 激活的線程會嘗試獲取剛增長的信號量.

public void release() {
    //(1)arg=1
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    //(2)嘗試釋放資源
    if (tryReleaseShared(arg)) {
        //(3)資源釋放成功則調用park方法喚醒AQS隊列裏面最早掛起的線程
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        //獲取當前信號量值
        int current = getState();
        //將當前信號量值增長releases,這裏爲增長1
        int next = current + releases;
        //移除處理
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //使用CAS保證更新信號量值的原子性
        if (compareAndSetState(current, next))
            return true;
    }
}
複製代碼

由代碼release()->sync.releaseShared(1),可知,release方法每次只會對信號量值增長1,tryReleaseShared方法是無限循環,使用CAS保證了release方法對信號量遞增1的原子性操做.tryReleaseShared方法增長信號量值成功後會執行代碼(3),即調用AQS的方法來激活由於調用acquire方法而被阻塞的線程。

void release(int permits)方法

該方法與不帶參數的release方法的不一樣之處在於,前者每次調用會在信號量值原來的基礎上增長 permits,然後者每次增長l。

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
複製代碼

另外能夠看到,這裏的sync.releaseShared是共享方法,這說明該信號量是線程共享的,信號量沒有和固定線程綁定,多個線程能夠同時使用CAS去更新信號量的值而不會被阻塞。

相關文章
相關標籤/搜索