JUC併發工具包之Semaphore

Semaphore (JDK)

咱們使用semaphore去限制獲取特定資源的併發線程數量。
下面的例子中,咱們實現了一個簡單的登陸隊列來限制登入系統的用戶數量:java

class LoginQueueUsingSemaphore {
 
    private Semaphore semaphore;
 
    public LoginQueueUsingSemaphore(int slotLimit) {
        semaphore = new Semaphore(slotLimit);
    }
 
    boolean tryLogin() {
        return semaphore.tryAcquire();
    }
 
    void logout() {
        semaphore.release();
    }
 
    int availableSlots() {
        return semaphore.availablePermits();
    }
}

注意下咱們使用這些方法的方式:git

  • tryAcquire():若是還有可用的permit(構造方法傳入的,表示限制的線程數量)則當即返回true,不然返回false,可是acquire()方法會以阻塞的方式獲取一個permit。
  • release():釋放一個permit。
  • availablePermits():返回當前可用的permit數量。

咱們來測試一下咱們的登陸隊列,咱們首先使用完全部的permit,而後再獲取一個看看是否會被阻塞:github

@Test
public void givenLoginQueue_whenReachLimit_thenBlocked() {
    int slots = 10;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(loginQueue::tryLogin));
    executorService.shutdown();
 
    assertEquals(0, loginQueue.availableSlots());
    assertFalse(loginQueue.tryLogin());
}

如今咱們logout()一下看看是否有可用的permit:apache

@Test
public void givenLoginQueue_whenLogout_thenSlotsAvailable() {
    int slots = 10;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(loginQueue::tryLogin));
    executorService.shutdown();
    assertEquals(0, loginQueue.availableSlots());
    loginQueue.logout();
 
    assertTrue(loginQueue.availableSlots() > 0);
    assertTrue(loginQueue.tryLogin());
}

結果顯而易見。併發

Timed Semaphore (Apache Commons)

咱們如今看看ApacheCommons下實現的TimedSemaphore。TimedSemaphore容許在既定的時間內維護必定數量的Semaphore(這段時間內和JDK實現的Semaphore效果同樣),當時間過去後會釋放全部的permits。測試

咱們能夠使用TimedSemaphore來構建一個簡單的延時隊列:ui

class DelayQueueUsingTimedSemaphore {
 
    private TimedSemaphore semaphore;
 
    DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
        semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
    }
 
    boolean tryAdd() {
        return semaphore.tryAcquire();
    }
 
    int availableSlots() {
        return semaphore.getAvailablePermits();
    }
}

如今咱們設置超時時間1秒,在1秒鐘以內使用完全部的permit再次嘗試獲取的時候就會沒有可用的permit:this

public void givenDelayQueue_whenReachLimit_thenBlocked() {
    int slots = 50;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    DelayQueueUsingTimedSemaphore delayQueue 
      = new DelayQueueUsingTimedSemaphore(1, slots);
     
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(delayQueue::tryAdd));
    executorService.shutdown();
 
    assertEquals(0, delayQueue.availableSlots());
    assertFalse(delayQueue.tryAdd());
}

可是把線程休眠1秒後,這時候semaphore會重置並釋放全部的permits線程

@Test
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
    int slots = 50;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(delayQueue::tryAdd));
    executorService.shutdown();
 
    assertEquals(0, delayQueue.availableSlots());
    Thread.sleep(1000);
    assertTrue(delayQueue.availableSlots() > 0);
    assertTrue(delayQueue.tryAdd());
}

Semaphore vs. Mutex

Mutex像是一個二進制的Semaphore,咱們能夠使用它來實現互斥。
在下面的這個例子中,咱們使用一個permit爲1的Semaphore來構建一個計數器:code

class CounterUsingMutex {
 
    private Semaphore mutex;
    private int count;
 
    CounterUsingMutex() {
        mutex = new Semaphore(1);
        count = 0;
    }
 
    void increase() throws InterruptedException {
        mutex.acquire();
        this.count = this.count + 1;
        Thread.sleep(1000);
        mutex.release();
 
    }
 
    int getCount() {
        return this.count;
    }
 
    boolean hasQueuedThreads() {
        return mutex.hasQueuedThreads();
    }
}

當大量線程同時來操做counter的時候,他們都會在隊列中阻塞:

@Test
public void whenMutexAndMultipleThreads_thenBlocked()
 throws InterruptedException {
    int count = 5;
    ExecutorService executorService
     = Executors.newFixedThreadPool(count);
    CounterUsingMutex counter = new CounterUsingMutex();
    IntStream.range(0, count)
      .forEach(user -> executorService.execute(() -> {
          try {
              counter.increase();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }));
    executorService.shutdown();
 
    assertTrue(counter.hasQueuedThreads());
}

咱們把線程休眠一會後,全部的線程都將能操做counter,這時隊列中就沒有等待排隊的線程了。

@Test
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount()
 throws InterruptedException {
    int count = 5;
    ExecutorService executorService
     = Executors.newFixedThreadPool(count);
    CounterUsingMutex counter = new CounterUsingMutex();
    IntStream.range(0, count)
      .forEach(user -> executorService.execute(() -> {
          try {
              counter.increase();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }));
    executorService.shutdown();
 
    assertTrue(counter.hasQueuedThreads());
    Thread.sleep(5000);
    assertFalse(counter.hasQueuedThreads());
    assertEquals(count, counter.getCount());
}

CodeRepo

完整代碼在這

相關文章
相關標籤/搜索