系列文章目錄 java
Semaphore(信號量)也是經常使用的併發工具之一,它經常用於流量控制。一般狀況下,公共的資源經常是有限的,例如數據庫的鏈接數。使用Semaphore能夠幫助咱們有效的管理這些有限資源的使用。數據庫
Semaphore的結構和ReentrantLock以及CountDownLatch很像,內部採用了公平鎖與非公平鎖兩種實現,若是你已經看過了ReentrantLock源碼分析 和 CountDownLatch源碼分析,弄懂它將絕不費力。segmentfault
與CountDownLatch相似,Semaphore主要是經過AQS的共享鎖機制實現的,所以它的核心屬性只有一個sync,它繼承自AQS:數組
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { //省略 } protected final boolean tryReleaseShared(int releases) { // } final void reducePermits(int reductions) { //省略 } final int drainPermits() { //省略 } }
這裏的permits
和CountDownLatch的count
很像,它們最終都將成爲AQS中的state
屬性的初始值。併發
Semaphore有兩個構造函數:框架
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
默認的構造函數使用的是非公平鎖,另外一個構造函數經過傳入的fair
參數來決定使用公平鎖仍是非公平鎖,這一點和ReentrantLock用的是一樣的套路,都是一樣的代碼框架。函數
公平鎖和非公平鎖的定義以下:工具
static final class FairSync extends Sync { FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } static final class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
獲取信號量的方法有4個:源碼分析
acquire方法 | 本質調用 |
---|---|
acquire() |
sync.acquireSharedInterruptibly(1) |
acquire(int permits) |
sync.acquireSharedInterruptibly(permits) |
acquireUninterruptibly() |
sync.acquireShared(1) |
acquireUninterruptibly(int permits) |
sync.acquireShared(permits); |
可見,acquire()
方法就至關於acquire(1)
,acquireUninterruptibly
同理,只不過一種響應中斷,一種不響應中斷,關於AQS的那四個方法咱們在前面的文章中都已經分析過了,除了其中的tryAcquireShared(arg)
由子類實現外,其餘的都由AQS實現。ui
值得注意的是,在逐行分析AQS源碼(3)——共享鎖的獲取與釋放中咱們特別提到過tryAcquireShared
返回值的含義:
這裏的返回值其實表明的是剩餘的信號量的值,若是爲負值則說明信號量不夠了。
接下來咱們就看看子類對於tryAcquireShared(arg)
方法的實現:
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
與通常的tryAcquire邏輯不一樣,Semaphore的tryAcquire邏輯是一個自旋操做,由於Semaphore是共享鎖,同一時刻可能有多個線程來修改這個值,因此咱們必須使用自旋 + CAS
來避免線程衝突。
該方法退出的惟一條件是成功的修改了state值,並返回state的剩餘值。若是剩下的信號量不夠了,則就不須要進行CAS操做,直接返回剩餘值。因此其實tryAcquireShared返回的不是當前剩餘的信號量的值,而是若是扣去acquires以後,當前將要剩餘的信號量的值,若是這個「將要」剩餘的值比0小,則是不會發生扣除操做的。這就比如我要買10個包子,包子鋪如今只剩3個了,則將會返回剩餘3 - 10 = -7
個包子,可是事實上包子店並無將包子賣出去,實際剩餘的包子仍是3個;此時若是有另外一我的來只要買1個包子,則將會返回剩餘3 - 1 = 2
個包子,而且包子店會將一個包子賣出,實際剩餘的包子數也是2個。
非公平鎖的這種獲取信號量的邏輯其實和CountDownLatch的countDown方法很像:
// CountDownLatch public void countDown() { sync.releaseShared(1); }
在countDown()
的releaseShared(1)
方法中將調用tryReleaseShared
:
// CountDownLatch protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
對比CountDownLatch的tryReleaseShared
方法和Semaphore的tryAcquireShared
方法可知,它們的核心邏輯都是減小state的值,只不過CountDownLatch借用了共享鎖的殼,對它而言,減小state的值是一種釋放共享鎖的行爲,由於它的目的是將state值降爲0;而在Semaphore中,減小state的值是一種獲取共享鎖的行爲,減小成功了,則獲取成功。
protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
經過對比能夠看出,它和nonfairTryAcquireShared的惟一的差異在於:
if (hasQueuedPredecessors()) return -1;
即在獲取共享鎖以前,先用hasQueuedPredecessors
方法判斷有沒有人排在本身前面。關於hasQueuedPredecessors
方法,咱們在前面的文章中已經分析過了,它就是判斷當前節點是否有前驅節點,有的話直接返回獲取失敗,由於要讓前驅節點先去獲取鎖。(畢竟公平鎖講究先來後到嘛)
釋放信號量的方法有2個:
public void release() { sync.releaseShared(1); }
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
可見,release()
至關於調用了 release(1)
,它們最終都調用了tryReleaseShared(int releases)
方法:
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
與獲取信號量的邏輯相反,釋放信號量的邏輯是將獲得的信號量再歸還回去,所以是增長state值的操做,代碼自己很容易理解,這裏再也不贅述。
除了以上獲取和釋放信號量所用到的方法,Semaphore還定義了一些其餘方法來幫助咱們操做信號量:
注意,這個tryAcquire
不是給acquire方法使用的!!!咱們上面分析信號量的獲取時說過,獲取信號量的acquire方法調用的是AQS的acquireShared
和acquireSharedInterruptibly
,而這兩個方法會調用子類的tryAcquireShared
方法,子類必須實現這個方法。
而這裏的tryAcquire
方法並無定義在AQS的子類中,即既不在NonfairSync
,也不在FairSync
中,對於使用共享鎖的AQS的子類,也不須要定義這個方法。事實上它直接定義在Semaphore中的。
因此,在看這個方法時,腦海中必定要有一個意識,雖然它和AQS的獨佔鎖的獲取邏輯中的tryAcquire
重名了,但實際上它和AQS的獨佔鎖是沒有關係的,不要被它的名字繞暈了。
那麼,這個tryAcquire
和tryAcquireShared
方法有什麼不一樣呢?只要有兩點:
tryAcquire
返回boolean
類型,tryAcquireShared
返回int
tryAcquire
必定是採用非公平鎖模式,而tryAcquireShared
有公平和非公平兩種實現。理清楚以上幾點以後,咱們再來看tryAcquire方法的源碼,它有四種重載形式:
兩種不帶超時機制的形式:
public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; }
public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; }
兩種帶超時機制的形式:
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }
其中,不帶超時機制的tryAcquire
方法實際上調用的就是nonfairTryAcquireShared(int acquires)
方法,它和非公平鎖的tryAcquireShared
同樣,只是tryAcquireShared
是直接return nonfairTryAcquireShared(acquires)
,而tryAcquire
是return sync.nonfairTryAcquireShared(1) >= 0;
,即直接返回獲取鎖的操做是否成功。
而帶超時機制的tryAcquire
方法提供了一種超時等待的方式,這是前面介紹的公平鎖和非公平鎖的獲取鎖邏輯中所沒有的,它本質上調用了AQS的tryAcquireSharedNanos(int arg, long nanosTimeout)
方法:
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
這個方法咱們在介紹CountDownLatch源碼分析的await(long timeout, TimeUnit unit)
方法時已經分析過了,屬於老套路了,這裏就不展開了。
reducePermits方法用來減小信號量的總數,這在debug中是頗有用的,它與前面介紹的acquire方法的不一樣點在於,即便當前信號量的值不足,它也不會致使調用它的線程阻塞等待。只要須要減小的信號量的數量reductions
大於0,操做最終就會成功,也就是說,即便當前的reductions大於現有的信號量的值也不要緊,因此該方法可能會致使剩餘信號量爲負值。
protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); }
final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } }
咱們將它和nonfairTryAcquireShared對比一下:
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
能夠看出,二者在CAS前的判斷條件並不相同,reducePermits只要剩餘值不比當前值大就能夠,而nonfairTryAcquireShared必需要保證剩餘值不小於0纔會執行CAS操做。
相比reducePermits,drainPermits就更簡單了,它直接將剩下的信號量一次性消耗光,而且返回所消耗的信號量,這個方法在debug中也是頗有用的:
public int drainPermits() { return sync.drainPermits(); }
final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } }
以上咱們分析了信號量的源碼,接下來咱們來分析一下官方給的一個使用的例子:
class Pool { private static final int MAX_AVAILABLE = 100; // 初始化一個信號量,設置爲公平鎖模式,總資源數爲100個 private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { // 獲取一個信號量 available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) available.release(); } // Not a particularly efficient data structure; just for demo protected Object[] items = ...whatever kinds of items being managed protected boolean[] used = new boolean[MAX_AVAILABLE]; protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; // not reached } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else return false; } } return false; } }
這個例子很簡單,咱們用items數組表明可用的資源,用used數組來標記已經使用的資源的,used[i]
的值爲true,則表明items[i]
這個資源已經被使用了。
(1) 獲取一個可用資源
咱們調用getItem()
來獲取資源,在該方法中會先調用available.acquire()
方法請求一個信號量,注意,這裏若是當前信號量數不夠時,是會阻塞等待的;當咱們成功地獲取了一個信號量以後,將會調用getNextAvailableItem
方法,返回一個可用的資源。
(2) 釋放一個資源
咱們調用putItem(Object x)
來釋放資源,在該方法中會先調用markAsUnused(Object item)
將須要釋放的資源標記成可用狀態(即將used數組中對應的位置標記成false), 若是釋放成功,咱們就調用available.release()
來釋放一個信號量。
Semaphore是一個有效的流量控制工具,它基於AQS共享鎖實現。咱們經常用它來控制對有限資源的訪問。每次使用資源前,先申請一個信號量,若是資源數不夠,就會阻塞等待;每次釋放資源後,就釋放一個信號量。
(完)