Semaphore[ˈseməfɔ:(r)]意爲信號量,比較書面的解釋是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。
Semaphore維護了信號量許可,線程只有得到了許可纔可以訪問資源,能夠把Semaphore理解爲風景區管理員,風景區有人數限制,達到了人數限制管理員就會讓後來的遊客等着直到風景區裏面的遊客離開,這裏風景區至關於須要協調的公共資源,人數限制就至關於Semaphore維護的許可量,而遊客就至關因而執行任務的線程。node
Semaphore是基於共享鎖實現的,內部類Sync是同步器AQS的子類,Sync有兩個子類:公平信號量FairSync和非公平信號量NonefairSync,Semaphore默認非公平策略。數據結構
public class SemaphoreTest { private static final int THREAD_COUNT = 10; private static ExecutorService pool = Executors.newFixedThreadPool(THREAD_COUNT); public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 0; i < THREAD_COUNT; i++) { pool.execute(() -> { try { semaphore.acquire(); System.out.println("Thread " + Thread.currentThread().getId() + " is saving data"); Thread.sleep(1000); System.out.println("Thread " + Thread.currentThread().getId() + " finished"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }); } pool.shutdown(); } }
運行結果:
函數
結果代表:同時只有三個線程能執行,由於Semaphore許可只有3個,至關於只有三個現場能訪問同步資源,只有當線程釋放許可,其餘線程才能獲取許可訪問同步資源。oop
Semaphore提供兩種構造函數,默認是非公平策略的,會根據傳入的許可permits設置同步狀態state。ui
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; /** All mechanics via AbstractQueuedSynchronizer subclass */ private final Sync sync; public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; //根據permits設置AQSstate Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } } }
Semaphore提供的獲取許可permits方法有acquire方法,都是調用的Sync分類AQS的acquireSharedInterruptibly方法,首先介紹基於公平策略如何獲取信號量的。線程
//獲取一個許可 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //獲取多個許可 public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } //AQS方法 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //有中斷則拋出異常 if (Thread.interrupted()) throw new InterruptedException(); //嘗試獲取「共享鎖」;獲取成功則直接返回,獲取失敗,則經過doAcquireSharedInterruptibly()獲取。 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
Sync子類FairSync實現的tryAcquireShared方法,首先判斷AQS同步隊列還有沒有其餘正在等待的線程,若是當前線程前面沒有等待線程,嘗試CAS修改,採用的是循環+CAS的方式修改同步狀態code
protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; //目前還有多少量可 int available = getState(); //當前線程得到acquires個許可後剩下的許可 int remaining = available - acquires; //剩下的許可大於0,CAS修改 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
若是嘗試獲取失敗,就會調用AQS的doAcquireSharedInterruptibly方法,首先會以當前線程構成共享型Node節點加入同步隊列尾部,若是上一個節點是head節點,就嘗試獲取共享鎖,不然就進入等待狀態,等待前繼節點成爲head節點釋放共享鎖並喚醒後繼節點。blog
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 建立」當前線程「的Node節點,且Node中記錄的鎖是」共享鎖「類型;並將該節點添加到AQS同步隊列末尾。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //當前節點的上一個節點p final Node p = node.predecessor(); //節點p是頭節點就嘗試修改同步狀態 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 當前線程一直等待,直到獲取到共享鎖。 // 若是線程在等待過程當中被中斷過,則再次中斷該線程(還原以前的中斷狀態)。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
信號量的釋放,本質上就是釋放獲取到的共享鎖。與acquire方法對應,釋放信號量也有兩種release方法,都調用了AQS的releaseShared方法。隊列
//釋放一個許可 public void release() { sync.releaseShared(1); } //釋放多個許可 public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } //AQS方法 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared方法是有內部類Sync提供實現的,意味着公平方式與非公平方式釋放共享鎖的實現相同的。循環+CAS修改同步狀態。
protected final boolean tryReleaseShared(int releases) { for (;;) { //當前同步狀態/許可數 int current = getState(); //釋放了releases個許可後剩餘的許可數 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //CAS修改同步狀態 if (compareAndSetState(current, next)) return true; } }
tryReleaseShared成功釋放後,doReleaseShared喚醒等待線程
private void doReleaseShared() { for (;;) { //頭節點 Node h = head; // 若是頭節點不爲null,而且頭節點不等於tail節點。同步隊列除了head節點還有其餘等待節點 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
非公平方式獲取以及釋放信號量的實現與公平方式只有tryAcquireShared的實現不一樣,釋放的邏輯是相同的。
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
基於共享鎖實現的Semaphore能夠控制必定數量的線程同時訪問同步資源,超過數量的線程須要等待直到有線程完成操做釋放許可,從而保證合理使用同步資源。