Semaphore:實現一個限流器

關注微信公衆號 JavaStormjava

Semaphore 如今廣泛翻譯成 "信號量",從概念上講信號量維護着一組 "憑證",獲取到憑證的線程才能訪問資源,使用完成後釋放, 咱們可使用信號量來限制訪問特定資源的併發線程數。數據庫

就像現實生活中的停車場車位,當有空位的時候才能放車子進入,否則就只能等待,出來的車子則釋放憑證。安全

信號量模型

能夠簡單的歸納爲:一個計數器、一個等待隊列、三個方法。 在信號量模型裏,計數器和等待隊列對外是透明的,只能經過信號量模型提供的三個方法訪問它們,init()、acquire()、release()微信

  • init(): 設置計數器的初始值,初始化憑證數量。能夠理解爲停車場的車位數量。
  • acquire():計數器的值減 1 ;若是此時計數器的值小於 0,則當前線程將被阻塞,放到等待隊列之中,不然當前線程能夠繼續執行。
  • release():計數器值加 1;若是此時計數器的值小於或者等於 0,則喚醒等待隊列中的一個線程,並將其從等待隊列中移除。

這裏提到的 init()、acquire()、release() 三個方法都是原子性的,而且這個原子性是由信號量模型的實現方保證的。在 Java SDK 裏面,信號量模型是由 java.util.concurrent.Semaphore 實現的,Semaphore 這個類可以保證這三個方法都是原子操做。併發

經過一個簡化版的信號模型代碼便於理解:app

public class Semaphore {
    //計數器
    private int count;
    //保存線程的等待隊列
    private Queue queue;

    /** * 初始化計數器 * @param count */
    public Semaphore(int count) {
        this.count = count;
    }

    /** * 獲取憑證 */
    public void acquire(){
        this.count--;
        if(this.count<0){
            // 將當前線程插入等待隊列
            // 阻塞當前線程
        }
    }

    /** * 釋放憑證 */
    public void release(){
        this.count++;
        if(this.count >= 0) {
            // 移除等待隊列中的某個線程 T
            // 喚醒線程 T
        }
    }
}

複製代碼

信號量的使用

經過上文咱們瞭解到信號量模型原理,接下來則看如何在實際場景中使用。這裏咱們仍是用累加器的例子來講明信號量的使用吧。在累加器的例子裏面,count++ 操做是個臨界區,只容許一個線程執行,也就是說要保證互斥。函數

public class TestSemaPhore {
    private static int count;
    //初始化信號量爲 1
    private static final Semaphore semaphore = new Semaphore(1);

    public static void addOne() throws InterruptedException {
        //使用信號量保證互斥,只有一個線程進入
        semaphore.acquire();
        try {
            count++;
        } finally {
            semaphore.release();
        }
    }

    public static int getCount() {
        return count;
    }

    public static void main(String[] args) throws InterruptedException {
        //模擬十個線程同時訪問
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    addOne();
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        countDownLatch.countDown();
        TimeUnit.SECONDS.sleep(3);
        int count = getCount();
        System.out.println(count);
    }

}
複製代碼

咱們來分析下信號量如何保證互斥的。ui

假設兩個線程 T1 和 T2 同時訪問 addOne(),當他們都調用semaphore.acquire();的時候,因爲這是一個原子操做,因此只有一個線程能把信號量計數器減爲 0,另一個線程 T2 則是將計數器減爲 -1。對應線程 T1 計數器的值爲 0 ,知足大於等於 0,因此線程 T1 會繼續執行;對於線程 T2,信號量計數器的值爲 -1,小於 0 ,按照咱們以前的信號量模型 acquire()描述,線程 T2 將被阻塞進入等待隊列。因此此刻只有線程 T1 進入臨界區執行 count++this

當前信號量計數器的值爲 -1 ,當線程 T1 執行 semaphore.release()操做執行完後 計數器 +1 則變成了 0,知足小於等於 0,按照模型的定義,此刻等待隊列中的 T2 將會被喚醒,因而 T2 在 T1 執行完臨界區代碼後纔得到進入代碼領截取的機會,從而保證了互斥性。spa

實現一個限流器

上面的例子咱們利用信號量實現了一個簡單的互斥鎖,你會不會以爲奇怪,既然 Java SDK 裏面提供了 Lock,爲啥還要提供一個 Semaphore ?其實 Semaphore 還有一個功能是 Lock 不容易實現的,那就是:Semaphore 能夠容許多個線程訪問一個臨界區

常見的就是池化資源,好比鏈接池、對象池、線程池等。好比熟悉的數據庫鏈接池,在同一時刻容許多個線程同時使用鏈接,固然在每一個鏈接被釋放以前,是容許其餘線程使用的。

如今咱們假設有一個場景,對象池需求,就是一次性建立 N 哥對象,以後全部的線程都複用這 N 個對象,在對象被釋放前,是不容許其餘線程使用。

/** * 對象池 * */
public class ObjectPool {
    //使用 阻塞隊列保存對象池
    private final ArrayBlockingQueue<InputSaleMapDO> pool;
    //信號量
    private final Semaphore sem;

    /** * 初始化對象池 * * @param size 池大小 */
    public ObjectPool(int size) {
        pool = new ArrayBlockingQueue<>(size);
        sem = new Semaphore(size);
        for (int i = 0; i < size; i++) {
            InputSaleMapDO inputSaleMapDO = new InputSaleMapDO();
            inputSaleMapDO.setId((long) i);
            pool.add(inputSaleMapDO);
        }
    }

    //利用對象池的對象調用 function
    public Long run(Function<InputSaleMapDO, Long> function) throws InterruptedException {
        InputSaleMapDO obj = null;
        sem.acquire();
        try {
            obj = pool.poll();
            return function.apply(obj);
        } finally {
            pool.add(obj);
            sem.release();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ObjectPool objectPool = new ObjectPool(2);

        //模擬十個線程同時訪問
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    objectPool.run(f -> {
                        System.out.println(f);
                        return f.getId();
                    });
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        countDownLatch.countDown();
        TimeUnit.SECONDS.sleep(30);
    }
}

複製代碼

初始化線程池大小 2 ,咱們模擬 10 個線程,每次只能兩個線程分配對象 InputSaleMapDO。

執行完回調函數以後,它們就會釋放對象(這個釋放工做是經過 pool.add(obj) 實現的),同時調用 release() 方法來更新信號量的計數器。若是此時信號量裏計數器的值小於等於 0,那麼說明有線程在等待,此時會自動喚醒等待的線程。

思考

上面的例子中 保存對象池使用了 ArrayBlockingQueue ,是一個線程安全的容器,那麼是否能夠換成 ArrayList?歡迎後臺給出答案。還有假設是停車場的車位做爲對象池,車主停車是否是也可使用 Semaphore 實現?

JavaStorm
相關文章
相關標籤/搜索