咱們使用semaphore
去限制獲取特定資源的併發線程數量。
下面的例子中,咱們實現了一個簡單的登陸隊列來限制登入系統的用戶數量:java
class LoginQueueUsingSemaphore { private Semaphore semaphore; public LoginQueueUsingSemaphore(int slotLimit) { semaphore = new Semaphore(slotLimit); } boolean tryLogin() { return semaphore.tryAcquire(); } void logout() { semaphore.release(); } int availableSlots() { return semaphore.availablePermits(); } }
注意下咱們使用這些方法的方式:git
咱們來測試一下咱們的登陸隊列,咱們首先使用完全部的permit,而後再獲取一個看看是否會被阻塞:github
@Test public void givenLoginQueue_whenReachLimit_thenBlocked() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); assertFalse(loginQueue.tryLogin()); }
如今咱們logout()一下看看是否有可用的permit:apache
@Test public void givenLoginQueue_whenLogout_thenSlotsAvailable() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); loginQueue.logout(); assertTrue(loginQueue.availableSlots() > 0); assertTrue(loginQueue.tryLogin()); }
結果顯而易見。併發
咱們如今看看ApacheCommons下實現的TimedSemaphore。TimedSemaphore容許在既定的時間內維護必定數量的Semaphore(這段時間內和JDK實現的Semaphore效果同樣),當時間過去後會釋放全部的permits。測試
咱們能夠使用TimedSemaphore來構建一個簡單的延時隊列:ui
class DelayQueueUsingTimedSemaphore { private TimedSemaphore semaphore; DelayQueueUsingTimedSemaphore(long period, int slotLimit) { semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit); } boolean tryAdd() { return semaphore.tryAcquire(); } int availableSlots() { return semaphore.getAvailablePermits(); } }
如今咱們設置超時時間1秒,在1秒鐘以內使用完全部的permit再次嘗試獲取的時候就會沒有可用的permit:this
public void givenDelayQueue_whenReachLimit_thenBlocked() { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); assertFalse(delayQueue.tryAdd()); }
可是把線程休眠1秒後,這時候semaphore會重置並釋放全部的permits
:線程
@Test public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); Thread.sleep(1000); assertTrue(delayQueue.availableSlots() > 0); assertTrue(delayQueue.tryAdd()); }
Mutex像是一個二進制的Semaphore,咱們能夠使用它來實現互斥。
在下面的這個例子中,咱們使用一個permit爲1的Semaphore來構建一個計數器:code
class CounterUsingMutex { private Semaphore mutex; private int count; CounterUsingMutex() { mutex = new Semaphore(1); count = 0; } void increase() throws InterruptedException { mutex.acquire(); this.count = this.count + 1; Thread.sleep(1000); mutex.release(); } int getCount() { return this.count; } boolean hasQueuedThreads() { return mutex.hasQueuedThreads(); } }
當大量線程同時來操做counter的時候,他們都會在隊列中阻塞:
@Test public void whenMutexAndMultipleThreads_thenBlocked() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); }
咱們把線程休眠一會後,全部的線程都將能操做counter,這時隊列中就沒有等待排隊的線程了。
@Test public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); Thread.sleep(5000); assertFalse(counter.hasQueuedThreads()); assertEquals(count, counter.getCount()); }