Java 中的線程池(ThreadPoolExecutor)咱們都知道(不知道請自行搜索),它的執行機制簡單講就是多個線程不停的從隊列裏面取任務執行。可是咱們可能遇到下面這樣的場景:java
我有一批數據要經過線程池來處理,處理過程當中須要調用某個遠程服務。但該服務存在調用頻率限制,好比每秒鐘最多調用 50 次,超過這個閾值將返回錯誤信息。安全
這是否意味着咱們不該該用多線程了呢?不是,在這個場景中,咱們要保證的是以間隔不低於 20ms 的頻率發起請求,至於處理時間,無論是幾百甚至幾千毫秒,都不影響發起請求的頻率,所以多線程是必要的。多線程
默認的線程池(ThreadPoolExecutor)沒有按固定頻率執行任務的特性,有的同窗可能會想到 ScheduledThreadPoolExecutor,可是很惋惜這個類也不能用,別看它名字裏面帶了計劃任務的特性,但這個是用來反覆執行同一個任務的,而咱們的場景是一個任務只執行一次。ide
固然也有的同窗會想到一種方案,依舊使用 ScheduledThreadPoolExecutor,可是將任務隊列外部化(即不使用 ScheduledThreadPoolExecutor 的內部任務隊列),而後 ScheduledThreadPoolExecutor 的任務自己就是從外部隊列取任務執行。ui
這種方案是可行的,可是拋開實現起來過於複雜不說,線程池的執行機制也會遭到破壞,好比說咱們原本能夠經過 shutdown()
和 awaitTermination()
來等待線程池隊列所有執行完,令線程池安全關閉;但若任務隊列外部化,這點就作不到了,由於線程池會馬上關閉,不會再處理外部隊列中的剩餘任務。this
這裏有一個相對簡單的解決方案。好在 ThreadPoolExecutor 給咱們提供了 beforeExecute()
這樣一個擴展點,咱們能夠經過繼承 ThreadPoolExecutor,覆寫這個方法來實現執行頻率的限制:線程
因而可知,這樣的設計既實現了執行頻率限制,又保持了任務執行自己的並行性,同時線程池的執行機制沒有受到影響。設計
代碼實現起來不復雜,以下:code
public class FundThreadPoolExecutor extends ThreadPoolExecutor { private int fixedRateMillis; private final Semaphore fixedRateSemaphore = new Semaphore(1); // 設置執行頻率限制的延遲時間(ms) public void setFixedRateMillis(int fixedRateMillis) { this.fixedRateMillis = fixedRateMillis; } public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override protected void beforeExecute(Thread t, Runnable r) { if (this.fixedRateMillis > 0) { try { this.fixedRateSemaphore.acquire(); Thread.sleep(this.fixedRateMillis); } catch (InterruptedException e) { // ignore this } finally { this.fixedRateSemaphore.release(); } } } }