本文首發於一世流雲的專欄: https://segmentfault.com/blog...
Semaphore
,又名信號量,這個類的做用有點相似於「許可證」。有時,咱們由於一些緣由須要控制同時訪問共享資源的最大線程數量,好比出於系統性能的考慮須要限流,或者共享資源是稀缺資源,咱們須要有一種辦法可以協調各個線程,以保證合理的使用公共資源。java
Semaphore維護了一個許可集,其實就是必定數量的「許可證」。
當有線程想要訪問共享資源時,須要先獲取(acquire)的許可;若是許可不夠了,線程須要一直等待,直到許可可用。當線程使用完共享資源後,能夠歸還(release)許可,以供其它須要的線程使用。segmentfault
另外,Semaphore支持公平/非公平策略,這和ReentrantLock相似,後面講Semaphore原理時會看到,它們的實現自己就是相似的。數組
咱們來看下Oracle官方給出的示例:併發
class Pool { private static final int MAX_AVAILABLE = 100; // 可同時訪問資源的最大線程數 private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); protected Object[] items = new Object[MAX_AVAILABLE]; //共享資源 protected boolean[] used = new boolean[MAX_AVAILABLE]; public Object getItem() throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) available.release(); } private synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; } private synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else return false; } } return false; } }
items數組能夠當作是咱們的共享資源,當有線程嘗試使用共享資源時,咱們要求線程先得到「許可」(調用Semaphore 的acquire方法),這樣線程就擁有了權限,不然就須要等待。當使用完資源後,線程須要調用Semaphore 的release方法釋放許可。框架
注意:上述示例中,對於共享資源訪問須要由鎖來控制,Semaphore僅僅是保證了線程由權限使用共享資源,至於使用過程當中是否由併發問題,須要經過鎖來保證。
總結一下,許可數 ≤ 0表明共享資源不可用。許可數 > 0,表明共享資源可用,且多個線程能夠同時訪問共享資源。性能
這是否是和CountDownLatch有點像?
咱們來比較下:ui
同步器 | 做用 |
---|---|
CountDownLatch | 同步狀態State > 0 表示資源不可用,全部線程須要等待;State == 0 表示資源可用,全部線程能夠同時訪問 |
Semaphore | 剩餘許可數 < 0 表示資源不可用,全部線程須要等待; 許可剩餘數 ≥ 0 表示資源可用,全部線程能夠同時訪問 |
若是讀者閱讀過本系列的AQS相關文章,應該立馬能夠反應過來,這其實就是對同步狀態的定義不一樣。
CountDownLatch內部實現了AQS的共享功能,那麼 Semaphore是否也同樣是利用內部類實現了AQS的共享功能呢?
咱們先來看下Semaphore的內部:spa
能夠看到,Semaphore果真是經過內部類實現了AQS框架提供的接口,並且基本結構幾乎和ReentrantLock徹底同樣,經過內部類分別實現了公平/非公平策略。線程
Semaphore sm = new Semaphore (3, true);
設計
Semaphore有兩個構造器:
構造器1:
構造器2:
構造時須要指定「許可」的數量——permits,內部結構以下:
咱們仍是經過示例來分析:
假設如今一共3個線程: ThreadA、 ThreadB、 ThreadC。一個許可數爲2的公平策略的 Semaphore。線程的調用順序以下:
Semaphore sm = new Semaphore (2, true); // ThreadA: sm.acquire() // ThreadB: sm.acquire(2) // ThreadC: sm.acquire() // ThreadA: sm.release() // ThreadB: sm.release(2)
Semaphore sm = new Semaphore (2, true);
能夠看到,內部建立了一個FairSync對象,並傳入許可數permits:
Sync是Semaphore的一個內部抽象類,公平策略的FairSync和非公平策略的NonFairSync都繼承該類。
能夠看到,構造器傳入的permits值就是同步狀態的值,這也體現了咱們在AQS系列中說過的:
AQS框架的設計思想就是分離構建同步器時的一系列關注點,它的全部操做都圍繞着資源——同步狀態(synchronization state)來展開,並將資源的定義和訪問留給用戶解決:
Semaphore的acquire方法內部調用了AQS的方法,入參"1"表示嘗試獲取1個許可:
AQS的acquireSharedInterruptibly方式是共享功能的一部分,咱們在AQS系列中就已經對它很熟悉了:
關鍵來看下Semaphore是如何實現tryAcquireShared方法的:
對於 Semaphore來講,線程是能夠一次性嘗試獲取多個許可的,此時只要剩餘的許可數量夠,最終會經過自旋操做更新成功。若是剩餘許可數量不夠,會返回一個負數,表示獲取失敗。
顯然,ThreadA獲取許可成功。此時,同步狀態值State == 1
,等待隊列的結構以下:
帶入參的aquire方法內部和無參的同樣,都是調用了AQS的acquireSharedInterruptibly方法:
此時,ThreadB同樣進入tryAcquireShared方法。不一樣的是,此時剩餘許可數不足,由於ThreadB一次性獲取2個許可,tryAcquireShared方法返回一個負數,表示獲取失敗:remaining = available - acquires = 1- 2 = -1;
ThreadB會調用doAcquireSharedInterruptibly方法:
上述方法首先經過addWaiter方法將ThreadB包裝成一個共享結點,加入等待隊列:
而後會進入自旋操做,先嚐試獲取一次資源,顯然此時是獲取失敗的,而後判斷是否要進入阻塞(shouldParkAfterFailedAcquire):
上述方法會先將前驅結點的狀態置爲SIGNAL,表示ThreadB須要阻塞,但在阻塞以前須要將前驅置爲SIGNAL,以便未來能夠喚醒ThreadB。
最終ThreadB會在parkAndCheckInterrupt中進入阻塞:
此時,同步狀態值依然是State == 1
,等待隊列的結構以下:
流程和步驟3徹底相同,ThreadC被包裝成結點加入等待隊列後:
同步狀態:State == 1
Semaphore的realse方法調用了AQS的releaseShared方法,默認入參爲"1",表示歸還一個許可:
來看下Semaphore是如何實現tryReleaseShared方法的,tryReleaseShared方法是一個自旋操做,直到更新State成功:
更新完成後,State == 2
,ThreadA會進入doReleaseShared方法,先將頭結點狀態置爲0,表示即將喚醒後繼結點:
此時,等待隊列結構:
而後調用unparkSuccessor方法喚醒後繼結點:
此時,ThreadB被喚醒,會從原阻塞處繼續向下執行:
此時,同步狀態:State == 2
ThreadB被喚醒後,從下面開始繼續往下執行,進入下一次自旋:
在下一次自旋中,ThreadB調用tryAcquireShared方法成功獲取到共享資源(State修改成0),setHeadAndPropagate方法把ThreadB變爲頭結點,
並根據傳播狀態判斷是否要喚醒並釋放後繼結點:
同步狀態:State == 0
ThreadB會調用doReleaseShared方法,繼續嘗試喚醒後繼的共享結點(也就是ThreadC),這個過程和ThreadB被喚醒徹底同樣:
同步狀態:State == 0
因爲目前共享資源仍爲0,因此ThreadC被喚醒後,在通過嘗試獲取資源失敗後,又進入了阻塞:
內部和無參的release方法同樣:
更新完成後,State == 2
,ThreadA會進入doReleaseShared方法,喚醒後繼結點:
此時,等待隊列結構:
同步狀態:State == 2
因爲目前共享資源爲2,因此ThreadC被喚醒後,獲取資源成功:
最終同步隊列的結構以下:
同步狀態:State == 0
Semaphore其實就是實現了AQS共享功能的同步器,對於Semaphore來講,資源就是許可證的數量:
這裏共享的含義是多個線程能夠同時獲取資源,當計算出的剩餘資源不足時,線程就會阻塞。
注意:Semaphore不是鎖,只能限制同時訪問資源的線程數,至於對數據一致性的控制,Semaphore是不關心的。當前,若是是隻有一個許可的Semaphore,能夠看成鎖使用。
另外,上述咱們討論的是Semaphore的公平策略,非公平策略的差別並不大:
能夠看到,非公平策略不會去查看等待隊列的隊首是否有其它線程正在等待,而是直接嘗試修改State值。
Semaphore還有兩個比較特殊的方法,這兩個方法的特色是採用自旋操做State變量,直到成功爲止。因此,並不會阻塞調用線程。
reducePermits
reducePermits當即減小指定數目的可用許可數。
drainPermits
drainPermits方法用於將可用許可數清零,並返回清零前的許可數