Semaphore是一種同步輔助工具,翻譯過來就是信號量,用來實現流量控制,它能夠控制同一時間內對資源的訪問次數.bash
不管是Synchroniezd仍是ReentrantLock,一次都只容許一個線程訪問一個資源,可是Semaphore能夠指定多個線程同時訪問某一個資源.併發
Semaphore有一個構造函數,能夠傳入一個int型整數n,表示某段代碼最多隻有n個線程能夠訪問,若是超出了n,那麼請等待,等到某個線程執行完畢這段代碼塊,下一個線程再進入。dom
信號量上定義兩種操做:ide
信號量主要用於兩個目的:函數
如下的例子:5個線程搶3個車位,同時最多隻有3個線程能搶到車位,等其餘線程釋放信號量後,才能搶到車位.工具
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//申請資源
System.out.println(Thread.currentThread().getName()+"搶到車位");
ThreadUtil.sleep(RandomUtil.randomInt(1000,5000));
System.out.println(Thread.currentThread().getName()+"歸還車位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//釋放資源
semaphore.release();
}
}
},"線程"+i).start();
}
}
複製代碼
abstract static class Sync extends AbstractQueuedSynchronizer {
//省略
}
複製代碼
Semaphore內部使用Sync類,Sync又是繼承AbstractQueuedSynchronizer,因此Sync底層仍是使用AQS實現的.Sync有兩個實現類NonfairSync和FairSync,用來指定獲取信號量時是否採用公平策略.ui
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Sync(int permits) {
setState(permits);
}
複製代碼
如上所示,Semaphore默認採用非公平策略,若是須要使用公平策略則可使用帶兩個參數的構造函數來構造Semaphore對象。spa
參數permits被傳遞給AQS的state值,用來表示當前持有的信號量個數.線程
當前線程調用該方法的目的是但願獲取一個信號量資源。翻譯
若是當前信號量個數大於0,則當前信號量的計數會減1,而後該方法直接返回。不然若是當前信號量個數等0,則當前線程會被放入AQS的阻塞隊列。當其餘線程調用了當前線程的interrupt()方法中斷了當前線程時,則當前線程會拋出InterruptedException異常返回。
//Semaphore方法
public void acquire() throws InterruptedException {
//傳遞參數爲1,說明要獲取1個信號量資源
sync.acquireSharedInterruptibly(1);
}
//AQS的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//(1)若是線程被中斷,則拋出中斷異常
if (Thread.interrupted())
throw new InterruptedException();
//(2)不然調用Sync子類方法嘗試獲取,這裏根據構造函數肯定使用公平策略
if (tryAcquireShared(arg) < 0)
//若是獲取失敗則放入阻塞隊列.而後再次嘗試,若是使用則調用park方法掛起當前線程
doAcquireSharedInterruptibly(arg);
}
複製代碼
由如上代碼可知,acquire()在內部調用了Sync的acquireSharedlnterruptibly方法,後者會對中斷進行響應(若是當前線程被中斷,則拋出中斷異常)。嘗試獲取信號量資源的AQS的方法 tryAcquireShared是由Sync的子類實現的,因此這裏分別從兩 方面來討論。
先討論非公平策略NonfairSync類的tryAcquireShared方法,代碼以下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//獲取當前信號量值
int available = getState();
//計算當前剩餘值
int remaining = available - acquires;
//若是當前剩餘值小於0或則CAS設置成功則返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
複製代碼
如上代碼先獲取當前信號量值(available),而後減去須要獲取的值(acquires),獲得剩餘的信號量個數(remaining),若是剩餘值小於0則說明當前信號量個數知足不了需求,那麼直接返回負數,這時當前線程會被放入AQS的阻塞隊列而被掛起。若是剩餘值大於0,則使用CAS操做設置當前信號量值爲剩餘值,而後返回剩餘值。
另外,因爲NonFairSync是非公平獲取的,也就是說先調用aquire方法獲取信號量的線程不必定比後來者先獲取到信號量。
考慮下面場景,若是線程A先調用了aquire()方法獲取信號量,可是當前信號量個數爲0,那麼線程A會被放入AQS的阻塞隊列 。過一段時間後線程C調用了release()方法釋放了一個信號量,若是當前沒有其餘線程獲取信號量,那麼線程A就會被激活,而後獲取該信號量,可是假如線程C釋放信號量後,線程C調用了aquire方法,那麼線程C就會和線程A去競爭這個信號量資源 。 若是採用非公平策略,由nonfairTryAcquireShared的代碼可知,線程C徹底能夠在線程A被激活前,或者激活後先於線程 A獲取到該信號量,也就是在這種模式下阻塞線程和當前請求的線程是競爭關係,而不遵循先來先得的策略。
下面看公平性的FairSync類是如何保證公平性的。
protected int tryAcquireShared(int acquires) {
for (;;) {
//查詢是否當前線程節點的前驅節點也在等待獲取該資源,有的話直接返回
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
複製代碼
可見公平性仍是靠hasQueuedPredecessors這個函數來保證的。因此Semaphore的公平策略是看當前線程節點的前驅節點是否也在等待獲取該資源,若是是則本身放棄獲取的權限,而後當前線程會被放入AQS阻塞隊列,不然就去獲取。
該方法與acquire()方法不一樣,後者只須要獲取一個信號量值, 而前者則獲取permits個。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
複製代碼
該方法與acquire()相似,不一樣之處在於該方法對中斷不響應,也就是噹噹前線程調用了 acquireUninterruptibly獲取資源時(包含被阻塞後),其餘線程調用了當前線程的interrupt() 方法設置了當前線程的中斷標誌,此時當前線程並不會拋出IllegalArgumentException異常而返回。
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
複製代碼
該方法的做用是把當前Semaphore對象的信號量值增長1,若是當前有線程由於調用aquire方法被阻塞而被放入了AQS的阻塞 隊列,則會根據公平策略選擇一個信號量個數能被知足的線程進行激活, 激活的線程會嘗試獲取剛增長的信號量.
public void release() {
//(1)arg=1
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//(2)嘗試釋放資源
if (tryReleaseShared(arg)) {
//(3)資源釋放成功則調用park方法喚醒AQS隊列裏面最早掛起的線程
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//獲取當前信號量值
int current = getState();
//將當前信號量值增長releases,這裏爲增長1
int next = current + releases;
//移除處理
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//使用CAS保證更新信號量值的原子性
if (compareAndSetState(current, next))
return true;
}
}
複製代碼
由代碼release()->sync.releaseShared(1),可知,release方法每次只會對信號量值增長1,tryReleaseShared方法是無限循環,使用CAS保證了release方法對信號量遞增1的原子性操做.tryReleaseShared方法增長信號量值成功後會執行代碼(3),即調用AQS的方法來激活由於調用acquire方法而被阻塞的線程。
該方法與不帶參數的release方法的不一樣之處在於,前者每次調用會在信號量值原來的基礎上增長 permits,然後者每次增長l。
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
複製代碼
另外能夠看到,這裏的sync.releaseShared是共享方法,這說明該信號量是線程共享的,信號量沒有和固定線程綁定,多個線程能夠同時使用CAS去更新信號量的值而不會被阻塞。