實現限制執行頻率的線程池

Java 中的線程池(ThreadPoolExecutor)咱們都知道(不知道請自行搜索),它的執行機制簡單講就是多個線程不停的從隊列裏面取任務執行。可是咱們可能遇到下面這樣的場景:java

我有一批數據要經過線程池來處理,處理過程當中須要調用某個遠程服務。但該服務存在調用頻率限制,好比每秒鐘最多調用 50 次,超過這個閾值將返回錯誤信息。安全

這是否意味着咱們不該該用多線程了呢?不是,在這個場景中,咱們要保證的是以間隔不低於 20ms 的頻率發起請求,至於處理時間,無論是幾百甚至幾千毫秒,都不影響發起請求的頻率,所以多線程是必要的。多線程

默認的線程池(ThreadPoolExecutor)沒有按固定頻率執行任務的特性,有的同窗可能會想到 ScheduledThreadPoolExecutor,可是很惋惜這個類也不能用,別看它名字裏面帶了計劃任務的特性,但這個是用來反覆執行同一個任務的,而咱們的場景是一個任務只執行一次。ide

固然也有的同窗會想到一種方案,依舊使用 ScheduledThreadPoolExecutor,可是將任務隊列外部化(即不使用 ScheduledThreadPoolExecutor 的內部任務隊列),而後 ScheduledThreadPoolExecutor 的任務自己就是從外部隊列取任務執行。ui

這種方案是可行的,可是拋開實現起來過於複雜不說,線程池的執行機制也會遭到破壞,好比說咱們原本能夠經過 shutdown()awaitTermination() 來等待線程池隊列所有執行完,令線程池安全關閉;但若任務隊列外部化,這點就作不到了,由於線程池會馬上關閉,不會再處理外部隊列中的剩餘任務。this

這裏有一個相對簡單的解決方案。好在 ThreadPoolExecutor 給咱們提供了 beforeExecute() 這樣一個擴展點,咱們能夠經過繼承 ThreadPoolExecutor,覆寫這個方法來實現執行頻率的限制:線程

  1. 使每一個線程在執行任務前延遲一段時間;
  2. 使用一個信號量來同步這段延遲,這樣每一個線程在執行任務前被這個信號量鎖住,拿到鎖後延遲一段時間再釋放鎖,而後再執行任務。

因而可知,這樣的設計既實現了執行頻率限制,又保持了任務執行自己的並行性,同時線程池的執行機制沒有受到影響。設計

代碼實現起來不復雜,以下:code

public class FundThreadPoolExecutor extends ThreadPoolExecutor {

    private int fixedRateMillis;

    private final Semaphore fixedRateSemaphore = new Semaphore(1);

    // 設置執行頻率限制的延遲時間(ms)
    public void setFixedRateMillis(int fixedRateMillis) {
        this.fixedRateMillis = fixedRateMillis;
    }

    public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if (this.fixedRateMillis > 0) {
            try {
                this.fixedRateSemaphore.acquire();
                Thread.sleep(this.fixedRateMillis);
            } catch (InterruptedException e) {
                // ignore this
            } finally {
                this.fixedRateSemaphore.release();
            }
        }
    }
}
相關文章
相關標籤/搜索