Java併發——ScheduledThreadPoolExecutor分析

ScheduledThreadPoolExecutor分析

從圖中咱們能夠看到ScheduledThreadPoolExecutor繼承ThreadPoolExecutor實現了ScheduledExecutorService接口。它至關於提供了"延遲"和"週期執行"功能的ThreadPoolExecutor,還有兩個重要內部類DelayedWorkQueueScheduledFutureTask數據結構

構造方法

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }
複製代碼

由於其繼承了ThreadPoolExecutor,調用了ThreadLocalExecutor的構造方法。當核心線程數達到corePoolSize,會將任務提交給有界阻塞隊列DelayedWorkQueue。ScheduledThreadPoolExecutor線程池最大線程數爲Integer.MAX_VALUE測試

主要方法

ScheduledThreadPoolExecutor實現了ScheduledExecutorService接口,該接口提供了以下方法:this

// 在給定延遲後,執行Runnable任務
    public ScheduledFuture schedule(Runnable command,
                                       long delay, TimeUnit unit);
    // 在給定延遲後,執行Callable任務
    public  ScheduledFuture schedule(Callable callable,
                                           long delay, TimeUnit unit);
    // 給定延遲(initialDelay)以後,隨後以給定時間(period)爲週期執行任務
    // 即執行將在initialDelay以後開始,而後是initialDelay+period,
    // 再是initialDelay + 2*period,依此類推
    // 若是上一個任務沒有執行完畢,則須要等上一個任務執行完畢後當即執行
    public ScheduledFuture scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
    // 建立並執行在給定的初始延遲(initialDelay)以後首先啓用的按期操做
    // 隨後每一個任務執行的終止和下一個執行的開始之間給定的延遲(delay)
    public ScheduledFuture scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
複製代碼

第1、第二個schedule方法都是一次性操做只不過入參一個是Runnable,一個是callable

scheduleAtFixedRate、scheduleWithFixedDelay方法能夠看以下示例spa

  • 示例:

  • public static void main(String[] args) {
            SimpleDateFormat sdf = new SimpleDateFormat("hh:MM:ss");
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
            Runnable task1 = () -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "測試" + sdf.format(new Date()));
            };
            Runnable task2 = () -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "測試" + sdf.format(new Date()));
            };
            executorService.scheduleAtFixedRate(task1, 0, 2, TimeUnit.SECONDS);
            executorService.scheduleWithFixedDelay(task2, 0, 2, TimeUnit.SECONDS);
        }
        
        輸出:
        pool-1-thread-1測試11:12:37
        pool-1-thread-2測試11:12:37
        pool-1-thread-1測試11:12:40
        pool-1-thread-2測試11:12:42
        pool-1-thread-1測試11:12:43
        pool-1-thread-1測試11:12:46
        pool-1-thread-2測試11:12:47
    複製代碼

    週期間隔2秒,任務耗時3秒
    scheduleAtFixedRate方法:
    1.若任務耗時超過週期間隔,則須要等待上個任務完成下個任務才能執行
    2.若任務耗時小於週期間隔,則下個任務按週期間隔執行任務
    scheduleWithFixedDelay方法:
    1.下任務等到上個任務執行完成+週期間隔以後才執行任務線程

  • schedule方法
    邏輯處理相差很少,以schedule方法爲例分析
    public  ScheduledFuture schedule(Callable callable,
                                               long delay,
                                               TimeUnit unit) {
            if (callable == null || unit == null)
                throw new NullPointerException();
            RunnableScheduledFuture t = decorateTask(callable,
                new ScheduledFutureTask(callable,
                                           triggerTime(delay, unit)));
            delayedExecute(t);
            return t;
        }
    複製代碼
    先參數校驗,再構造task,最後調用delayedExecute()方法延遲執行任務

    rest

    private void delayedExecute(RunnableScheduledFuture task) { // 判斷線程池是否處於RUNNING狀態,不處於則根據相應拒絕策略拒絕任務 if (isShutdown()) reject(task); else { // 往阻塞隊列中添加任務 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }
    複製代碼
    複製代碼void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } 複製代碼複製代碼
    ensurePrestart主要調用了addWorker方法,此方法主要作了兩件事:
    1.循環CAS將線程池中的線程數加一
    2.新建一個線程並啓用

    當線程執行任務,都會調用到任務的run()方法code

    public void run() {
                // 判斷是不是週期任務
                boolean periodic = isPeriodic();
                // 判斷當前線程狀態是否能執行任務
                if (!canRunInCurrentRunState(periodic))
                    cancel(false);
                // 不是週期性任務,直接執行任務    
                else if (!periodic)
                    ScheduledFutureTask.super.run();
                // 如果週期性任務,設置下次執行任務的時間    
                else if (ScheduledFutureTask.super.runAndReset()) {
                    // 設置任務下次執行時間
                    setNextRunTime();
                    // 將下次任務往阻塞隊列中添加
                    reExecutePeriodic(outerTask);
                }
            }
    複製代碼

    1.先判斷該任務是否能夠執行,若不能執行則調用cancel方法取消
    2.再判斷是不是週期性任務,若不是直接執行
    3.最後調用runAndReset方法執行任務並重置,setNextRunTime方法設置任務下次的執行時間,reExecutePeriodic方法從新把任務添加到隊列中.orm

    private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }
        
        void reExecutePeriodic(RunnableScheduledFuture task) {
            if (canRunInCurrentRunState(true)) {
                super.getQueue().add(task);
                if (!canRunInCurrentRunState(true) && remove(task))
                    task.cancel(false);
                else
                    ensurePrestart();
            }
        }
    複製代碼

    DelayedWorkQueue

    ScheduledThreadPoolExecutor是把任務添加到DelayedWorkQueue中,它是一個基於堆的數據結構,經過ScheduledFutureTask的compareTo方法比較大小,小的排在前面,大的排在後面cdn

    public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask x = (ScheduledFutureTask)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
    複製代碼

    首先按照time排序,time小的排在前面,大的排在後面,若time相同,則使用sequenceNumber排序,小的排在前面,大的排在後面blog

  • 相關文章
    相關標籤/搜索