爲何要單獨講解TimedSupervisorTask這個類呢?由於這個類在咱們DiscoveryClient類的initScheduledTasks方法進行定時任務初始化時被使用得比較多,因此咱們須要瞭解下這個類,咱們先看下TimedSupervisorTask這個類在initScheduledTasks的具體使用:ide
private final ScheduledExecutorService scheduler; private void initScheduledTasks() { …省略其餘代碼 // 初始化定時拉取服務註冊信息 scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); …省略其餘代碼 // 初始化定時服務續約任務 scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); …省略其餘代碼 }
因而可知,TimedSupervisorTask類被使用在了定時任務的初始化中,咱們具體來看看這個類的結構:ui
public class TimedSupervisorTask extends TimerTask { private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class); private final Counter timeoutCounter; private final Counter rejectedCounter; private final Counter throwableCounter; private final LongGauge threadPoolLevelGauge; private final ScheduledExecutorService scheduler; private final ThreadPoolExecutor executor; private final long timeoutMillis; private final Runnable task; private final AtomicLong delay; private final long maxDelay; public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) { this.scheduler = scheduler; this.executor = executor; this.timeoutMillis = timeUnit.toMillis(timeout); this.task = task; this.delay = new AtomicLong(timeoutMillis); this.maxDelay = timeoutMillis * expBackOffBound; // Initialize the counters and register. timeoutCounter = Monitors.newCounter("timeouts"); rejectedCounter = Monitors.newCounter("rejectedExecutions"); throwableCounter = Monitors.newCounter("throwables"); threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build()); Monitors.registerObject(name, this); } @Override public void run() { Future<?> future = null; try { future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); timeoutCounter.increment(); long currentDelay = delay.get(); // 若是出現異常,則將時間*2,而後取 定時時間 和 最長定時時間中最小的爲下次任務執行的延時時間 long newDelay = Math.min(maxDelay, currentDelay * 2); delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, reject the task", e); } else { logger.warn("task supervisor rejected the task", e); } rejectedCounter.increment(); } catch (Throwable e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, can't accept the task"); } else { logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally { if (future != null) { future.cancel(true); } if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } } }
咱們能夠仔細看看run方法的具體實現,由於這裏有一個值得借鑑的設計思路!!!this
咱們簡單來看看這個方法具體執行流程:
1.執行submit()方法提交任務
2.執行future.get()方法,若是沒有在規定的時間獲得返回值或者任務出現異常,則進入異常處理catch代碼塊。
3.若是發生異常
a. 發生TimeoutException異常,則執行Math.min(maxDelay, currentDelay ✖️ 2);獲得任務延時時間 ✖️ 2 和 最大延時時間的最小值,而後改變任務的延時時間timeoutMillis(延時任務時間默認值是30s)
b.發生RejectedExecutionException異常,則將rejectedCounter值+1
c.發生Throwable異常,則將throwableCounter值+1
4.若是沒有發生異常,則再設置一次延時任務時間timeoutMillis
5.進入finally代碼塊
a.若是future不爲null,則執行future.cancel(true),中斷線程中止任務
b.若是線程池沒有shutdown,則建立一個新的定時任務spa
\(\color{red}{注意}\):不知道有沒有小夥伴發現,無論咱們的定時任務執行是成功仍是結束(若是尚未執行結束,也會被中斷),而後會再從新初始化一個新的任務。而且這個任務的延時時間還會由於不一樣的狀況受到改變,在try代碼塊中若是不發現異常,則會從新初始化延時時間,若是發生TimeoutException異常,則會更改延時時間,更改成 任務延時時間 ✖️ 2 和 最大延時時間的最小值。因此咱們會發現這樣的設計會讓整個延時任務很靈活。若是不發生異常,則延時時間不會變;若是發現異常,則增加延時時間;若是程序又恢復正常了,則延時時間又恢復成了默認值。線程
總結:咱們在設計延時/週期性任務時就能夠參考TimedSupervisorTask的實現,程序一旦遇到發生超時異常,就將間隔時間調大,若是連續超時,那麼每次間隔時間都會增大一倍,一直到達外部參數設定的上限爲止,一旦新任務再也不發生超時異常,間隔時間又會自動恢復爲初始值。設計