Semaphore是計數信號量。Semaphore管理一系列許可證。每一個acquire方法阻塞,直到有一個許可證能夠得到而後拿走一個許可證;每一個release方法增長一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並無實際的許可證這個對象,Semaphore只是維持了一個可得到許可證的數量。java
Semaphore常常用於限制獲取某種資源的線程數量。下面舉個例子,好比說操場上有5個跑道,一個跑道一次只能有一個學生在上面跑步,一旦全部跑道在使用,那麼後面的學生就須要等待,直到有一個學生不跑了,下面是這個例子:ui
public class Playground { private String[] tracks = {"跑道1","跑道2","跑道3","跑道4","跑道5"};//一共有5個跑道 private volatile boolean[] used = new boolean[5];//標記跑道是否被佔用 private Semaphore semaphore = new Semaphore(5, true); //獲取一個跑道 public String getTrack() throws InterruptedException { semaphore.acquire(1); return getNextAvailableTrack(); } //返回一個跑道 public void releaseTrack(String track) { if (makeAsUnused(track)) semaphore.release(1); } //遍歷,找到一個沒人用的跑道 private String getNextAvailableTrack() { for (int i = 0; i < used.length; i++) { if (!used[i]) { used[i] = true; return tracks[i]; } } return null; } //釋放跑道,將使用標誌設置爲false private boolean makeAsUnused(String track) { for (int i = 0; i < used.length; i++) { if (tracks[i].equals(track)) { if (used[i]) { used[i] = false; return true; } else { return false; } } } return false; } public static void main(String[] args) { Executor executor = Executors.newCachedThreadPool(); Playground playground = new Playground(); Runnable runnable = ()->{ try { String track = playground.getTrack();//獲取跑道 if (track != null) { System.out.println("學生" + Thread.currentThread().getId() + "在" + track.toString() + "上跑步"); TimeUnit.SECONDS.sleep(2); System.out.println("學生" + Thread.currentThread().getId() + "釋放" + track.toString()); playground.releaseTrack(track);//釋放跑道 } } catch (InterruptedException e) { e.printStackTrace(); } }; for (int i = 0; i < 100; i++) { executor.execute(runnable); } } }
public Semaphore(int permits) { sync = new NonfairSync(permits);//提供許可數量,默認爲非公平模式 } public Semaphore(int permits, boolean fair) { //提供許可數量,指定是否爲公平模式 sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
Semaphore內部基於AQS的共享模式,因此實現都委託給了Sync類。.net
NonfairSync(int permits) { super(permits); }
Sync(int permits) { setState(permits);//AQS的state表示許可證的數量 }
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //若是線程被中斷了,拋出異常 if (Thread.interrupted()) throw new InterruptedException(); //獲取許可失敗,將線程加入到等待隊列中 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
AQS子類若是要使用共享模式的話,須要實現tryAcquireShared方法,下面看NonfairSync的該方法實現:線程
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
該方法調用了父類中的nonfairTyAcquireShared方法,以下:code
final int nonfairTryAcquireShared(int acquires) { for (;;) { //獲取剩餘許可數量 int available = getState(); //計算給完此次許可數量後的個數 int remaining = available - acquires; //若是許可不夠(獲取許可失敗)或者能夠將許可數量重置(獲取許可成功)的話,返回。 //只有在許可不夠時返回值纔會小於0,其他返回的都是剩餘許可數量,這也就解釋了,一旦許可不夠,後面的線程將會阻塞。 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
protected int tryAcquireShared(int acquires) { for (;;) { //若是前面有線程再等待,直接返回-1 if (hasQueuedPredecessors()) return -1; //後面與非公平同樣 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
FairSync與NonFairSync的區別就在於會首先判斷當前隊列中有沒有線程在等待,若是有,就老老實實進入到等待隊列;而不像NonfairSync同樣首先試一把,說不定就剛好得到了一個許可,這樣就能夠插隊了。對象
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
releaseShared方法在AQS中,以下:blog
public final boolean releaseShared(int arg) { //若是改變許可數量成功 if (tryReleaseShared(arg)) { doReleaseShared();//一旦CAS改變許可數量成功,就調用該方法釋放阻塞的線程。 return true; } return false; }
AQS子類實現共享模式的類須要實現tryReleaseShared類來判斷是否釋放成功,實現以下:隊列
protected final boolean tryReleaseShared(int releases) { for (;;) { //獲取當前許可數量 int current = getState(); //計算回收後的數量 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //CAS改變許可數量成功,返回true if (compareAndSetState(current, next)) return true; } }
protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); }
能夠看到,委託給了Sync,Sync的reducePermits方法以下:資源
final void reducePermits(int reductions) { for (;;) { //獲得當前剩餘許可數量 int current = getState(); //獲得減完以後的許可數量 int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); //若是CAS改變成功 //CAS改變AQS中的state變量,由於該變量表明許可證的數量。 if (compareAndSetState(current, next)) return; } }
Semaphore還能夠一次將剩餘的許可數量所有取走,該方法是drain方法,以下:rem
public int drainPermits() { return sync.drainPermits(); }
Sync的實現以下:
final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0))//CAS將許可數量置爲0。 return current; } }
Semaphore是信號量,用於管理一組資源。其內部是基於AQS的共享模式,AQS的狀態表示許可證的數量,在許可證數量不夠時,線程將會被掛起;而一旦有一個線程釋放一個資源,那麼就有可能從新喚醒等待隊列中的線程繼續執行。
參考地址: