Eureka系列(六) TimedSupervisorTask類解析

  爲何要單獨講解TimedSupervisorTask這個類呢?由於這個類在咱們DiscoveryClient類的initScheduledTasks方法進行定時任務初始化時被使用得比較多,因此咱們須要瞭解下這個類,咱們先看下TimedSupervisorTask這個類在initScheduledTasks的具體使用:ide

private final ScheduledExecutorService scheduler;
private void initScheduledTasks() {
    …省略其餘代碼
    // 初始化定時拉取服務註冊信息
    scheduler.schedule(
        new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        ),
        registryFetchIntervalSeconds, TimeUnit.SECONDS);

    …省略其餘代碼
    // 初始化定時服務續約任務
    scheduler.schedule(
        new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
        ),
       renewalIntervalInSecs, TimeUnit.SECONDS);
       …省略其餘代碼
}

  因而可知,TimedSupervisorTask類被使用在了定時任務的初始化中,咱們具體來看看這個類的結構:ui

public class TimedSupervisorTask extends TimerTask {
    private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class);

    private final Counter timeoutCounter;
    private final Counter rejectedCounter;
    private final Counter throwableCounter;
    private final LongGauge threadPoolLevelGauge;

    private final ScheduledExecutorService scheduler;
    private final ThreadPoolExecutor executor;
    private final long timeoutMillis;
    private final Runnable task;

    private final AtomicLong delay;
    private final long maxDelay;

    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.scheduler = scheduler;
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;

        // Initialize the counters and register.
        timeoutCounter = Monitors.newCounter("timeouts");
        rejectedCounter = Monitors.newCounter("rejectedExecutions");
        throwableCounter = Monitors.newCounter("throwables");
        threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
        Monitors.registerObject(name, this);
    }
    @Override
    public void run() {
        Future<?> future = null;
        try {
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();
            long currentDelay = delay.get();
            // 若是出現異常,則將時間*2,而後取 定時時間 和 最長定時時間中最小的爲下次任務執行的延時時間
            long newDelay = Math.min(maxDelay, currentDelay * 2);  
            delay.compareAndSet(currentDelay, newDelay);
        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }
            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }
            throwableCounter.increment();
        } finally {
            if (future != null) {
                future.cancel(true);
            }
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
}

  咱們能夠仔細看看run方法的具體實現,由於這裏有一個值得借鑑的設計思路!!!this

  咱們簡單來看看這個方法具體執行流程:
    1.執行submit()方法提交任務
    2.執行future.get()方法,若是沒有在規定的時間獲得返回值或者任務出現異常,則進入異常處理catch代碼塊。
    3.若是發生異常
      a. 發生TimeoutException異常,則執行Math.min(maxDelay, currentDelay ✖️ 2);獲得任務延時時間 ✖️ 2 和 最大延時時間的最小值,而後改變任務的延時時間timeoutMillis(延時任務時間默認值是30s)
      b.發生RejectedExecutionException異常,則將rejectedCounter值+1
      c.發生Throwable異常,則將throwableCounter值+1
    4.若是沒有發生異常,則再設置一次延時任務時間timeoutMillis
    5.進入finally代碼塊
      a.若是future不爲null,則執行future.cancel(true),中斷線程中止任務
      b.若是線程池沒有shutdown,則建立一個新的定時任務
spa

\(\color{red}{注意}\):不知道有沒有小夥伴發現,無論咱們的定時任務執行是成功仍是結束(若是尚未執行結束,也會被中斷),而後會再從新初始化一個新的任務。而且這個任務的延時時間還會由於不一樣的狀況受到改變,在try代碼塊中若是不發現異常,則會從新初始化延時時間,若是發生TimeoutException異常,則會更改延時時間,更改成 任務延時時間 ✖️ 2 和 最大延時時間的最小值。因此咱們會發現這樣的設計會讓整個延時任務很靈活。若是不發生異常,則延時時間不會變;若是發現異常,則增加延時時間;若是程序又恢復正常了,則延時時間又恢復成了默認值。線程

總結:咱們在設計延時/週期性任務時就能夠參考TimedSupervisorTask的實現,程序一旦遇到發生超時異常,就將間隔時間調大,若是連續超時,那麼每次間隔時間都會增大一倍,一直到達外部參數設定的上限爲止,一旦新任務再也不發生超時異常,間隔時間又會自動恢復爲初始值。設計

相關文章
相關標籤/搜索