JUC 中提供的限流利器-Semaphore(信號量)

在 JUC 包下,有一個 Semaphore 類,翻譯成信號量,Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。Semaphore 跟鎖(synchronized、Lock)有點類似,不一樣的地方是,鎖同一時刻只容許一個線程訪問某一資源,而 Semaphore 則能夠控制同一時刻多個線程訪問某一資源。java

Semaphore(信號量)並非 Java 語言特有的,幾乎全部的併發語言都有。因此也就存在一個信號量模型的概念,以下圖所示:數據庫

信號量模型

信號量模型比較簡單,能夠歸納爲:一個計數器、一個隊列、三個方法併發

計數器:記錄當前還能夠運行多少個資源訪問資源。框架

隊列:待訪問資源的線程ide

三個方法學習

  • init():初始化計數器的值,可就是容許多少線程同時訪問資源。
  • up():計數器加1,有線程歸還資源時,若是計數器的值大於或者等於 0 時,從等待隊列中喚醒一個線程
  • down():計數器減 1,有線程佔用資源時,若是此時計數器的值小於 0 ,線程將被阻塞。

這三個方法都是原子性的,由實現方保證原子性。例如在 Java 語言中,JUC 包下的 Semaphore 實現了信號量模型,因此 Semaphore 保證了這三個方法的原子性。ui

Semaphore 是基於 AbstractQueuedSynchronizer 接口實現信號量模型的。AbstractQueuedSynchronizer 提供了一個基於 FIFO 隊列,能夠用於構建鎖或者其餘相關同步裝置的基礎框架,利用了一個 int 來表示狀態,經過相似 acquire 和 release 的方式來操縱狀態。關於 AbstractQueuedSynchronizer 更多的介紹,能夠點擊連接:spa

ifeve.com/introduce-a…線程

AbstractQueuedSynchronizer 在 Semaphore 類中的實現類以下:翻譯

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
複製代碼

在 Semaphore 類中,實現了兩種信號量:公平的信號量和非公平的信號量,公平的信號量就是你們排好隊,先到先進,非公平的信號量就是不必定先到先進,容許插隊。非公平的信號量效率會高一些,因此默認使用的是非公平信號量。具體的能夠查看 Semaphore 類實現源碼。

Semaphore 類中,主要有如下方法:

// 構造方法,參數表示許可證數量,用來建立信號量
public Semaphore(int permits);
// 從信號量中獲取許可,至關於獲取到執行權
public void acquire() throws InterruptedException;
// 嘗試獲取1個許可,無論是否可以獲取成功,都當即返回,true表示獲取成功,false表示獲取失敗
public boolean tryAcquire();
// 將許可還給信號量
public void release();
複製代碼

Semaphore 類的實現就瞭解的差很少了。可能你會有疑問 Semaphore 的應用場景是什麼?Semaphore 能夠用來限流(流量控制),在一些公共資源有限的場景下,Semaphore 能夠派上用場。好比在作日誌清洗時,可能有幾十個線程在併發清洗,可是將清洗的數據存入到數據庫時,可能只給數據庫分配了 10 個鏈接池,這樣兩邊的線程數就不對等了,咱們必須保證同時只能有 10 個線程獲取數據庫連接,不然就會存在大量線程沒法連接上數據庫。

用 Semaphore 信號量來模擬這操做,代碼以下:

public class SemaphoreDemo {
    /** * semaphore 信號量,能夠限流 * * 模擬併發數據庫操做,同時有三十個請求,可是系統每秒只能處理 5 個 */

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = Executors
            .newFixedThreadPool(THREAD_COUNT);
	// 初始化信號量,個數爲 5 
    private static Semaphore s = new Semaphore(5);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 獲取許可
                        s.acquire();
                        System.out.println(Thread.currentThread().getName()+" 完成數據庫操做 ,"+ new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format( new Date()));
                        // 休眠兩秒鐘,效果更直觀
                        Thread.sleep(2000);
                        // 釋放許可
                        s.release();
                    } catch (InterruptedException e) {
                    }
                }
            });
        }
		// 關閉鏈接池
        threadPool.shutdown();
    }
}
複製代碼

運行效果以下:

圖片描述

從結果中,能夠看出,每秒只有 5 個線程在執行,這符合咱們的預期。

好了,關於 Semaphore 的內容就結束了,更加詳細的還請您查閱相關資料和閱讀 Semaphore 源碼。但願這篇文章對您的學習或者工做有所幫助。

感謝您的閱讀,祝好。

最後

目前互聯網上不少大佬都有 Semaphore(信號量) 相關文章,若有雷同,請多多包涵了。原創不易,碼字不易,還但願你們多多支持。若文中有所錯誤之處,還望提出,謝謝。

互聯網平頭哥
相關文章
相關標籤/搜索