源碼:html
package java.util.concurrent; import java.util.Collection; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L;//版本號 private final Sync sync;//內部類 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L;//版本號 //構造器,將容許運行的數量設爲permits Sync(int permits) { setState(permits); } //根據state獲得容許運行的數量 final int getPermits() { return getState(); } //不公平共享鎖獲取(狀態減去acquires) final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState();//獲取當前狀態 int remaining = available - acquires;//狀態值減acquires //若狀態值小於0則不更新,直接返回-1,表示獲取鎖失敗 //狀態值大於等於0,更新當前狀態值,返回大於等於0則表示成功 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining;//只有此處才能退出循環 } } //公平鎖釋放(狀態增長acquires) protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState();//獲取當前狀態 int next = current + releases;//狀態值加acquires if (next < current)//若狀態值有誤,拋出異常 throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next))//只有修改爲功,才能退出循環 return true; } } //容許運行數減小reductions final void reducePermits(int reductions) { for (;;) { int current = getState();//獲得當前容許運行數 int next = current - reductions;//將容許運行數減去reductions if (next > current)//若發現容許運行數發生了變化,則拋出異常 throw new Error("Permit count underflow"); if (compareAndSetState(current, next))//只有修改爲功,才能退出循環 return; } } //返回容許運行的數,並將容許運行數置爲0(「耗盡」全部剩餘共享資源) final int drainPermits() { for (;;) { int current = getState();//獲得當前狀態值 if (current == 0 || compareAndSetState(current, 0))//只有容許修改爲功或剩餘爲0,才能退出循環 return current;//返回當前狀態值 } } } //不公平策略 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L;//版本號 //和公平策略構造器相同 NonfairSync(int permits) {//調用sync的構造器初始化容許運行數 super(permits); } //直接調用nonfairTryAcquireShared,嘗試獲取不公平的共享鎖 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } //公平策略 static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L;//版本號 //和不公平策略構造器相同 FairSync(int permits) {//調用sync的構造器初始化容許運行數 super(permits); } //嘗試獲取公平的共享鎖。和不公平的共享鎖不一樣的關鍵方法 protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors())//存在前繼等待節點則直接返回 return -1; int available = getState();//獲得當前狀態值 int remaining = available - acquires;//當前狀態減去acquires if (remaining < 0 || compareAndSetState(available, remaining)) return remaining;//修改爲功 } } } //permits:一次性容許運行的線程數 //一個參數構造默認使用不公平策略. public Semaphore(int permits) { sync = new NonfairSync(permits); } //permits:一次性容許運行的線程數 //fair:是否使用公平策略. public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } //可響應中斷的獲取共享鎖1個(公平和不公平) public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //不支持響應中斷的獲取共享鎖1個(公平和不公平) public void acquireUninterruptibly() { sync.acquireShared(1); } //不公平的嘗試獲取共享鎖1個(不支持中斷) public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0;//大於等於0,則獲取成功 } //支持中斷在指定時間內獲取共享鎖1個 public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //計數值增長1 public void release() { sync.releaseShared(1);//計數值增長1 } /******************不使用時,每次調用一次性減1;**************************/ /******************傳入permits時,每次調用一次性減permits;***************/ //可響應中斷的獲取共享鎖(公平和不公平)permits個 public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException();//permits小於0,則拋出非法參數異常 sync.acquireSharedInterruptibly(permits); } //不支持響應中斷的獲取共享鎖permits個(公平和不公平) public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException();//permits小於0,則拋出非法參數異常 sync.acquireShared(permits); } //不公平的嘗試獲取共享鎖permits個(不支持中斷) public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException();//permits小於0,則拋出非法參數異常 return sync.nonfairTryAcquireShared(permits) >= 0;//大於等於0,則獲取成功 } //支持中斷在指定時間內獲取共享鎖permits個 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException();//permits小於0,則拋出非法參數異常 return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); } //增長指定數目permits(增長計數值permits) public void release(int permits) { if (permits < 0) throw new IllegalArgumentException();//permits小於0,則拋出非法參數異常 sync.releaseShared(permits);//增長計數值 } //獲得當前運行的數 public int availablePermits() { return sync.getPermits(); } //返回容許運行的數,並將容許運行數置爲0(「耗盡」全部剩餘共享資源) public int drainPermits() { return sync.drainPermits(); } //將容許運行數減小指定數目reduction.(「縮減」剩餘共享資源) protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException();//reduction小於0,則拋出非法參數異常 sync.reducePermits(reduction); } //根據判斷sync 是否爲FairSync類型,返回是否爲公平鎖 public boolean isFair() { return sync instanceof FairSync; } //返回隊列中是否存在等待狀態的節點 public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } //獲得隊列元素的總數 public final int getQueueLength() { return sync.getQueueLength(); } //獲得隊列中的線程集合 protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } //獲得字符串表示 public String toString() { return super.toString() + "[Permits = " + sync.getPermits() + "]"; } }
一個計數信號量,信號量維護了一個許可集。java
在許可可用前會阻塞每個調用acquire()的線程。編程
已獲取共享鎖的線程,執行 release()
添加一個許可,從而可能釋放一個正在阻塞的獲取者。api
Semaphore 一般用於限制能夠訪問某些資源(物理或邏輯的)的線程數目。數據結構
Semaphore(int permits) 建立具備給定的許可數和非公平的公平設置的 Semaphore 。 |
Semaphore(int permits, boolean fair) 建立具備給定的許可數和給定的公平設置的 Semaphore 。 |
構造方法可選地接受一個公平 參數。當設置爲 false 時,此類不對線程獲取許可的順序作任何保證。ide
特別地,闖入 是容許的,也就是說能夠在已經等待的線程前爲調用 acquire()
的線程分配一個許可,從邏輯上說,就是新線程將本身置於等待線程隊列的頭部。測試
當公平設置爲 true 時,信號量保證對於任何調用獲取
方法的線程而言,都按照處理它們調用這些方法的順序(即先進先出;FIFO)來選擇線程、得到許可。ui
非同步的 tryAcquire
方法不使用公平設置,而是使用任意可用的許可。spa
一般,應該將用於控制資源訪問的信號量初始化爲公平的,以確保全部線程均可訪問資源。.net
爲其餘的種類的同步控制使用信號量時,非公平排序的吞吐量優點一般要比公平考慮更爲重要。
void |
acquire() 今後信號量獲取一個許可,在提供一個許可前一直將線程阻塞,不然線程被中斷。 |
void |
acquire(int permits) 今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被中斷。 |
void |
acquireUninterruptibly() 今後信號量中獲取許可,在有可用的許可前將其阻塞。 |
void |
acquireUninterruptibly(int permits) 今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。 |
int |
availablePermits() 返回此信號量中當前可用的許可數。 |
int |
drainPermits() 獲取並返回當即可用的全部許可。 |
protected Collection<Thread> |
getQueuedThreads() 返回一個 collection,包含可能等待獲取的線程。 |
int |
getQueueLength() 返回正在等待獲取的線程的估計數目。 |
boolean |
hasQueuedThreads() 查詢是否有線程正在等待獲取。 |
boolean |
isFair() 若是此信號量的公平設置爲 true,則返回 true 。 |
protected void |
reducePermits(int reduction) 根據指定的縮減量減少可用許可的數目。 |
void |
release() 釋放一個許可,將其返回給信號量。 |
void |
release(int permits) 釋放給定數目的許可,將其返回到信號量。 |
String |
toString() 返回標識此信號量的字符串,以及信號量的狀態。 |
boolean |
tryAcquire() 僅在調用時此信號量存在一個可用許可,才從信號量獲取許可。 |
boolean |
tryAcquire(int permits) 僅在調用時此信號量中有給定數目的許可時,才今後信號量中獲取這些許可。 |
boolean |
tryAcquire(int permits, long timeout, TimeUnit unit) 若是在給定的等待時間內此信號量有可用的全部許可,而且當前線程未被中斷,則今後信號量獲取給定數目的許可。 |
boolean |
tryAcquire(long timeout, TimeUnit unit) 若是在給定的等待時間內,此信號量有可用的許可而且當前線程未被中斷,則今後信號量獲取一個許可。 |
public Semaphore(int permits)
建立具備給定的許可數和非公平的公平設置的 Semaphore
。
參數:
permits
- 初始的可用許可數目。此值可能爲負數,在這種狀況下,必須在授予任何獲取前進行釋放。
public Semaphore(int permits, boolean fair)
建立具備給定的許可數和給定的公平設置的 Semaphore
。
參數:
permits
- 初始的可用許可數目。此值可能爲負數,在這種狀況下,必須在授予任何獲取前進行釋放。
fair
- 若是此信號量保證在爭用時按先進先出的順序授予許可,則爲 true
;不然爲 false
。
public void acquire() throws InterruptedException
今後信號量獲取一個許可,在提供一個許可前一直將線程阻塞,不然線程被 中斷。
獲取一個許可(若是提供了一個)並當即返回,將可用的許可數減 1。
若是沒有可用的許可,則在發生如下兩種狀況之一前,禁止將當前線程用於線程安排目的並使其處於休眠狀態:
若是當前線程:
中斷
。 則拋出 InterruptedException
,而且清除當前線程的已中斷狀態。
拋出:
InterruptedException
- 若是當前線程被中斷
public void acquireUninterruptibly()
今後信號量中獲取許可,在有可用的許可前將其阻塞。
獲取一個許可(若是提供了一個)並當即返回,將可用的容許數減 1。
若是沒有可用的許可,則在其餘某些線程調用此信號量的 release()
方法,而且當前線程是下一個要被分配許可的線程前,禁止當前線程用於線程安排目的並使其處於休眠狀態。
若是當前線程在等待許可時被中斷,那麼它將繼續等待,可是與沒有發生中斷,其將接收容許的時間相比,爲該線程分配許可的時間可能改變。當線程確實今後方法返回後,將設置其中斷狀態。
public boolean tryAcquire()
僅在調用時此信號量存在一個可用許可,才從信號量獲取許可。
獲取一個許可(若是提供了一個)並當即返回,其值爲 true
,將可用的許可數減 1。
若是沒有可用的許可,則此方法當即返回而且值爲 false
。
即便已將此信號量設置爲使用公平排序策略,可是調用 tryAcquire()
也將 當即獲取許可(若是有一個可用),而無論當前是否有正在等待的線程。在某些狀況下,此「闖入」行爲可能頗有用,即便它會打破公平性也如此。若是但願遵照公平設置,則使用 tryAcquire(0, TimeUnit.SECONDS)
,它幾乎是等效的(它也檢測中斷)。
返回:
若是獲取了許可,則返回 true
;不然返回 false
。
public boolean tryAcquire(long timeout,TimeUnit unit) throws InterruptedException
若是在給定的等待時間內,此信號量有可用的許可而且當前線程未被 中斷,則今後信號量獲取一個許可。
獲取一個許可(若是提供了一個)並當即返回,其值爲 true
,將可用的許可數減 1。
若是沒有可用的容許,則在發生如下三種狀況之一前,禁止將當前線程用於線程安排目的並使其處於休眠狀態:
release()
方法而且當前線程是下一個被分配許可的線程;或者 若是獲取了許可,則返回值爲 true
。
若是當前線程:
則拋出 InterruptedException
,而且清除當前線程的已中斷狀態。
若是超出了指定的等待時間,則返回值爲 false
。若是該時間小於等於 0,則方法根本不等待。
參數:
timeout
- 等待許可的最多時間
unit
- timeout
參數的時間單位
返回:
若是獲取了許可,則返回 true
;若是獲取許可前超出了等待時間,則返回 false
拋出:
InterruptedException
- 若是當前線程是已中斷的
public void release()
釋放一個許可,將其返回給信號量。
釋放一個許可,將可用的許可數增長 1。若是任意線程試圖獲取許可,則選中一個線程並將剛剛釋放的許可給予它。而後針對線程安排目的啓用(或再啓用)該線程。
不要求釋放許可的線程必須經過調用 acquire()
來獲取許可。經過應用程序中的編程約定來創建信號量的正確用法。
public void acquire(int permits) throws InterruptedException
今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被 中斷。
獲取給定數目的許可(若是提供了)並當即返回,將可用的許可數減去給定的量。
若是沒有足夠的可用許可,則在發生如下兩種狀況之一前,禁止將當前線程用於線程安排目的並使其處於休眠狀態:
若是當前線程:
則拋出 InterruptedException
,而且清除當前線程的已中斷狀態。任何本來應該分配給此線程的許可將被分配給其餘試圖獲取許可的線程,就好像已經過調用 release()
而使許可可用同樣。
參數:
permits
- 要獲取的許可數
拋出:
InterruptedException
- 若是當前線程已被中斷
IllegalArgumentException
- 若是 permits
爲負
public void acquireUninterruptibly(int permits)
今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。
獲取給定數目的許可(若是提供了)並當即返回,將可用的許可數減去給定的量。
若是沒有足夠的可用許可,則在其餘某些線程調用此信號量的某個釋放
方法,當前線程是下一個要被分配許可的線程,而且可用的許可數目知足此請求前,禁止當前線程用於線程安排目的並使其處於休眠狀態。
若是當前的線程在等待許可時被中斷,則它會繼續等待而且它在隊列中的位置不受影響。當線程確實今後方法返回後,將其設置爲中斷狀態。
參數:
permits
- 要獲取的許可數
拋出:
IllegalArgumentException
- 若是 permits
爲負
public boolean tryAcquire(int permits)
僅在調用時此信號量中有給定數目的許可時,才今後信號量中獲取這些許可。
獲取給定數目的許可(若是提供了)並當即返回,其值爲 true
,將可用的許可數減去給定的量。
若是沒有足夠的可用許可,則此方法當即返回,其值爲 false
,而且不改變可用的許可數。
即便已將此信號量設置爲使用公平排序策略,可是調用 tryAcquire
也將 當即獲取許可(若是有一個可用),而無論當前是否有正在等待的線程。在某些狀況下,此「闖入」行爲可能頗有用,即便它會打破公平性也如此。若是但願遵照公平設置,則使用 tryAcquire(permits, 0, TimeUnit.SECONDS)
,它幾乎是等效的(它也檢測中斷)。
參數:
permits
- 要獲取的許可數
返回:
若是獲取了許可,則返回 true
;不然返回 false
拋出:
IllegalArgumentException
- 若是 permits
爲負
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
若是在給定的等待時間內此信號量有可用的全部許可,而且當前線程未被 中斷,則今後信號量獲取給定數目的許可。
獲取給定數目的許可(若是提供了)並當即返回,其值爲 true
,將可用的許可數減去給定的量。
若是沒有足夠的可用許可,則在發生如下三種狀況之一前,禁止將當前線程用於線程安排目的並使其處於休眠狀態:
若是獲取了許可,則返回值爲 true
。
若是當前線程:
則拋出 InterruptedException
,而且清除當前線程的已中斷狀態。任何本來應該分配給此線程的許可將被分配給其餘試圖獲取許可的線程,就好像已經過調用 release()
而使許可可用同樣。
若是超出了指定的等待時間,則返回值爲 false
。若是該時間小於等於 0,則方法根本不等待。任何本來應該分配給此線程的許可將被分配給其餘試圖獲取許可的線程,就好像已經過調用 release()
而使許可可用同樣。
參數:
permits
- 要獲取的許可數
timeout
- 等待許可的最多時間
unit
- timeout
參數的時間單位
返回:
若是獲取了許可,則返回 true
;若是獲取全部許可前超出了等待時間,則返回 false
拋出:
InterruptedException
- 若是當前線程是已中斷的
IllegalArgumentException
- 若是 permits
爲負
public void release(int permits)
釋放給定數目的許可,將其返回到信號量。
釋放給定數目的許可,將可用的許可數增長該量。若是任意線程試圖獲取許可,則選中某個線程並將剛剛釋放的許可給予該線程。若是可用許可的數目知足該線程的請求,則針對線程安排目的啓用(或再啓用)該線程;不然在有足夠的可用許可前線程將一直等待。若是知足此線程的請求後仍有可用的許可,則依次將這些許可分配給試圖獲取許可的其餘線程。
不要求釋放許可的線程必須經過調用獲取
來獲取該許可。經過應用程序中的編程約定來創建信號量的正確用法。
參數:
permits
- 要釋放的許可數
拋出:
IllegalArgumentException
- 若是 permits
爲負
public int availablePermits()
返回此信號量中當前可用的許可數。
此方法一般用於調試和測試目的。
返回:
此信號量中的可用許可數
public int drainPermits()
獲取並返回當即可用的全部許可。
返回:
獲取的許可數
protected void reducePermits(int reduction)
根據指定的縮減量減少可用許可的數目。此方法在使用信號量來跟蹤那些變爲不可用資源的子類中頗有用。此方法不一樣於 acquire
,在許可變爲可用的過程當中,它不會阻塞等待。
參數:
reduction
- 要移除的許可數
拋出:
IllegalArgumentException
- 若是 reduction
是負數
public boolean isFair()
若是此信號量的公平設置爲 true,則返回 true
。
返回:
若是此信號量的公平設置爲 true,則返回 true
public final boolean hasQueuedThreads()
查詢是否有線程正在等待獲取。注意,由於同時可能發生取消,因此返回 true
並不保證有其餘線程等待獲取許可。此方法主要用於監視系統狀態。
返回:
若是可能有其餘線程正在等待獲取鎖,則返回 true
public final int getQueueLength()
返回正在等待獲取的線程的估計數目。該值僅是估計的數字,由於在此方法遍歷內部數據結構的同時,線程的數目可能動態地變化。此方法用於監視系統狀態,不用於同步控制。
返回:
正在等待此鎖的線程的估計數目
protected Collection<Thread> getQueuedThreads()
返回一個 collection,包含可能等待獲取的線程。由於在構造此結果的同時實際的線程 set 可能動態地變化,因此返回的 collection 僅是盡力的估計值。所返回 collection 中的元素沒有特定的順序。此方法用於加快子類的構造速度,提供更多的監視設施。
返回:
線程 collection
public String toString()
返回標識此信號量的字符串,以及信號量的狀態。括號中的狀態包括 String 類型的 "Permits ="
,後跟許可數。
覆蓋:
返回:
標識此信號量的字符串,以及信號量的狀態
package com.thread; import java.util.concurrent.Semaphore; public class SemaphoreDemo implements Runnable{ private Semaphore smp = new Semaphore(3); @Override public void run() { try { System.out.println("Thread " + Thread.currentThread().getName() + " start"); smp.acquire(); System.out.println("Thread " + Thread.currentThread().getName() + " is working"); Thread.sleep(1000); smp.release(); System.out.println("Thread " + Thread.currentThread().getName() + " is over"); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args){ SemaphoreDemo semaphoreDemo = new SemaphoreDemo(); for (int i=1;i<=9;i++){ new Thread(semaphoreDemo).start(); } } }
運行結果:
Thread Thread-0 start
Thread Thread-3 start
Thread Thread-0 is working
Thread Thread-1 start
Thread Thread-2 start
Thread Thread-1 is working
Thread Thread-6 start
Thread Thread-5 start
Thread Thread-3 is working
Thread Thread-4 start
Thread Thread-7 start
Thread Thread-8 start
Thread Thread-0 is over
Thread Thread-2 is working
Thread Thread-3 is over
Thread Thread-5 is working
Thread Thread-1 is over
Thread Thread-6 is working
Thread Thread-2 is over
Thread Thread-5 is over
Thread Thread-8 is working
Thread Thread-4 is working
Thread Thread-7 is working
Thread Thread-6 is over
Thread Thread-8 is over
Thread Thread-7 is over
Thread Thread-4 is over
紅色部分說明:只能一次性運行3個線程,必需要等待某一個線程執行 release() 以後,才能喚醒等待的某一個線程繼續執行。
青色部分說明:因爲打印語句的執行可能會滯後,所以此處的運行結果不能說明 Semaphore 設計有誤,因此運行結果只能做爲參考。
package com.thread; import java.util.concurrent.Semaphore; public class SemaphoreDemo implements Runnable{ private Semaphore smp = new Semaphore(3); @Override public void run() { try { System.out.println("Thread " + Thread.currentThread().getName() + " start"); smp.acquire(3); System.out.println("Thread " + Thread.currentThread().getName() + " is working"); Thread.sleep(1000); smp.release(3); System.out.println("Thread " + Thread.currentThread().getName() + " is over"); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args){ SemaphoreDemo semaphoreDemo = new SemaphoreDemo(); for (int i=1;i<=9;i++){ new Thread(semaphoreDemo).start(); } } }
運行結果:
Thread Thread-0 start
Thread Thread-3 start
Thread Thread-4 start
Thread Thread-6 start
Thread Thread-1 start
Thread Thread-7 start
Thread Thread-8 start
Thread Thread-0 is working
Thread Thread-5 start
Thread Thread-2 start
Thread Thread-0 is over
Thread Thread-3 is working
Thread Thread-3 is over
Thread Thread-4 is working
Thread Thread-4 is over
Thread Thread-6 is working
Thread Thread-6 is over
Thread Thread-1 is working
Thread Thread-1 is over
Thread Thread-7 is working
Thread Thread-7 is over
Thread Thread-8 is working
Thread Thread-8 is over
Thread Thread-5 is working
Thread Thread-5 is over
Thread Thread-2 is working
Thread Thread-2 is over
因爲須要一次性獲取3個,因此只有一個線程執行完畢,才能執行下一個線程。