計數信號量用來控制同時訪問某個特定資源的操做數量,或者同時執行某個指定操做的數量。計數信號量還能夠用來實現某種資源池,或者對容器施加邊界,如數據庫鏈接池,固定大小的數據集。
Semaphore 管理一組虛擬的許可,在執行操做的時能夠先得到許可,並在使用之後釋放許可。若是沒有許可,那麼 acquire 將阻塞直到有許可,release 方法返回一個可用信號量。Semaphore 內部是經過 AQS 來實現的。
Semaphore 主要實現方法:java
Semapore 結構:node
public class Semaphore implements Serializable { private static final long serialVersionUID = -3222578661600680210L; private final Semaphore.Sync sync; public Semaphore(int var1) { this.sync = new Semaphore.NonfairSync(var1); } public Semaphore(int var1, boolean var2) { this.sync = (Semaphore.Sync)(var2?new Semaphore.FairSync(var1):new Semaphore.NonfairSync(var1)); } public void acquire() throws InterruptedException { this.sync.acquireSharedInterruptibly(1); } public void acquireUninterruptibly() { this.sync.acquireShared(1); } public boolean tryAcquire() { return this.sync.nonfairTryAcquireShared(1) >= 0; } public boolean tryAcquire(long var1, TimeUnit var3) throws InterruptedException { return this.sync.tryAcquireSharedNanos(1, var3.toNanos(var1)); } public void release() { this.sync.releaseShared(1); } public void acquire(int var1) throws InterruptedException { if(var1 < 0) { throw new IllegalArgumentException(); } else { this.sync.acquireSharedInterruptibly(var1); } } public void acquireUninterruptibly(int var1) { if(var1 < 0) { throw new IllegalArgumentException(); } else { this.sync.acquireShared(var1); } } public boolean tryAcquire(int var1) { if(var1 < 0) { throw new IllegalArgumentException(); } else { return this.sync.nonfairTryAcquireShared(var1) >= 0; } } public boolean tryAcquire(int var1, long var2, TimeUnit var4) throws InterruptedException { if(var1 < 0) { throw new IllegalArgumentException(); } else { return this.sync.tryAcquireSharedNanos(var1, var4.toNanos(var2)); } } public void release(int var1) { if(var1 < 0) { throw new IllegalArgumentException(); } else { this.sync.releaseShared(var1); } } }
從 Semaphore 中咱們能夠看到 Semaphore 的鎖是經過 Sync 這個類完成的,Sync 則繼承自 AQS ,AQS 是獨佔鎖和共享鎖的父類,經過繼承 AQS 實現共享鎖。
Sync 有兩個子類是 FairSync 和 NonfairSync,分別表明公平鎖和非公平鎖。Semaphore 默認是非公平鎖。爲何使用非公平鎖,這個是性能上的考慮,若是每次都去喚醒線程去獲取信號,這是很是消耗資源的,非公平鎖的性能和吞吐量也明顯優於公平鎖。
FairSync 結構:數據庫
static final class FairSync extends Semaphore.Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int var1) { super(var1); } protected int tryAcquireShared(int var1) { int var2; int var3; do { if(this.hasQueuedPredecessors()) { return -1; } var2 = this.getState(); var3 = var2 - var1; } while(var3 >= 0 && !this.compareAndSetState(var2, var3)); return var3; } }
FairSync 判斷是否有線程等待,若是沒有則嘗試獲取信號,若是有則加入到等待隊列。
NonfariSync 結構:函數
static final class NonfairSync extends Semaphore.Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int var1) { super(var1); } protected int tryAcquireShared(int var1) { return this.nonfairTryAcquireShared(var1); } } abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int var1) { this.setState(var1); } final int getPermits() { return this.getState(); } final int nonfairTryAcquireShared(int var1) { int var2; int var3; do { var2 = this.getState(); var3 = var2 - var1; } while(var3 >= 0 && !this.compareAndSetState(var2, var3)); return var3; } protected final boolean tryReleaseShared(int var1) { int var2; int var3; do { var2 = this.getState(); var3 = var2 + var1; if(var3 < var2) { throw new Error("Maximum permit count exceeded"); } } while(!this.compareAndSetState(var2, var3)); return true; } final void reducePermits(int var1) { int var2; int var3; do { var2 = this.getState(); var3 = var2 - var1; if(var3 > var2) { throw new Error("Permit count underflow"); } } while(!this.compareAndSetState(var2, var3)); } final int drainPermits() { int var1; do { var1 = this.getState(); } while(var1 != 0 && !this.compareAndSetState(var1, 0)); return var1; } }
NonfairSync 則先嚐試獲取鎖,若是獲取失敗則再加入等待隊列。
AQS 結構:oop
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable { private static final long serialVersionUID = 7373984972572414691L; private transient volatile AbstractQueuedSynchronizer.Node head; private transient volatile AbstractQueuedSynchronizer.Node tail; private volatile int state; static final long spinForTimeoutThreshold = 1000L; private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; protected AbstractQueuedSynchronizer() { } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 獲取失敗會被阻塞 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 設置等待線程頭部,並傳播喚醒下一個頭部等待線程 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } // 喚醒等待線程隊首線程 private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } private void doReleaseShared() { while(true) { AbstractQueuedSynchronizer.Node var1 = this.head; if(var1 != null && var1 != this.tail) { int var2 = var1.waitStatus; if(var2 == -1) { if(!compareAndSetWaitStatus(var1, -1, 0)) { continue; } this.unparkSuccessor(var1); } else if(var2 == 0 && !compareAndSetWaitStatus(var1, 0, -3)) { continue; } } if(var1 == this.head) { return; } } } }
Semaphore 調用 acquire方法時,調用 AQS 的 acquireSharedInterruptibly 方法,AQS 則調用子類的 tryAcquireShared 方法,若是獲取成功,則直接返回;若是獲取失敗,則調用調用 AQS 的方法 doAcquireSharedInterruptibly 阻塞並加入到等待隊列等待喚醒。
Semaphore 調用 release 方法時,調用 AQS 的 releaseShared 方法,AQS 則調用子類的 tryReleaseShared 方法,若是釋放成功,則調用 doReleaseShared 方法喚醒等待隊列隊首線程,線程啓動後,若是 tryAcquireShared 返回值大於等於 0,則經過 setHeadAndPropagate 方法進行傳播,喚醒下一個線程。性能