深刻理解Java線程池

深刻理解Java線程池

線程池初探

 所謂線程池,就是將多個線程放在一個池子裏面(所謂池化技術),而後須要線程的時候不是建立一個線程,而是從線程池裏面獲取一個可用的線程,而後執行咱們的任務。線程池的關鍵在於它爲咱們管理了多個線程,咱們不須要關心如何建立線程,咱們只須要關係咱們的核心業務,而後須要線程來執行任務的時候從線程池中獲取線程。任務執行完以後線程不會被銷燬,而是會被從新放到池子裏面,等待機會去執行任務。java

 咱們爲何須要線程池呢?首先一點是線程池爲咱們提升了一種簡易的多線程編程方案,咱們不須要投入太多的精力去管理多個線程,線程池會自動幫咱們管理好,它知道何時該作什麼事情,咱們只要在須要的時候去獲取就能夠了。其次,咱們使用線程池很大程度上歸咎於建立和銷燬線程的代價是很是昂貴的,甚至咱們建立和銷燬線程的資源要比咱們實際執行的任務所花費的時間還要長,這顯然是不科學也是不合理的,並且若是沒有一個合理的管理者,可能會出現建立了過多的線程的狀況,也就是在JVM中存活的線程過多,而存活着的線程也是須要銷燬資源的,另一點,過多的線程可能會形成線程過分切換的尷尬境地。編程

對線程池有了一個初步的認識以後,咱們來看看如何使用線程池。數組

// 建立一個線程池
        ExecutorService executorService = Executors.newFixedThreadPool(1);

        // 提交任務
        executorService.submit(() -> System.out.println("run"));
        Future<String> stringFuture = executorService.submit(() -> "run");

        // 建立一個調度線程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

        // 提交一個週期性執行的任務
        scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("schedule"),0,1, TimeUnit.SECONDS);

        // shutdown
        executorService.shutdown();
        scheduledExecutorService.shutdown();

能夠發現使用線程池很是簡單,只須要極少的代碼就能夠建立出咱們須要的線程池,而後將咱們的任務提交到線程池中去。咱們只須要在結束之時記得關閉線程池就能夠了。本文的重點並不是在於如何使用線程池,而是試圖剖析線程池的實現,好比一個調度線程池是怎麼實現的?是靠什麼實現的?爲何能這樣實現等等問題。數據結構

Java線程池實現架構

Java中與線程池相關的類都在java.util.concurrent包下,以下展現了一些:多線程

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor
  • Executors

 經過上面一節中的使用示例,能夠發現Executors類是一個建立線程池的有用的類,事實上,Executors類的角色也就是建立線程池,它是一個工廠類,能夠產生不一樣類型的線程池。而Executor是線程池的鼻祖類,它有兩個子類是ExecutorServiceScheduledExecutorService,而ThreadPoolExecutorScheduledThreadPoolExecutor則是真正的線程池,咱們的任務將被這兩個類交由其所管理者的線程池運行,能夠發現,ScheduledThreadPoolExecutor是一個萬千寵愛於一身的類,下面咱們能夠看看它的類關係圖:架構

@ScheduledThreadPoolExecutor類圖|center

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutorThreadPoolExecutor實現了通常的線程池,沒有調度功能,而ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor的實現,而後增長了調度功能。less

最爲原始的Executor只有一個方法execute,它接受一個Runnable類型的參數,意思是使用線程池來執行這個Runnable,能夠發現Executor不提供有返回值的任務。ExecutorService繼承了Executor,而且極大的加強了Executor的功能,不只支持有返回值的任務執行,並且還有不少十分有用的方法來爲你提供服務,下面展現了ExecutorService提供的方法:函數

@ExecutorService提供的方法|center

ScheduledExecutorService繼承了ExecutorService,而且增長了特有的調度(schedule)功能。關於Executor、ExecutorService和ScheduledExecutorService的關係,能夠見下圖:oop

Executor、ExecutorService和ScheduledExecutorService的關係|center

總結一下,通過咱們的調研,能夠發現其實對於咱們編寫多線程代碼來講,最爲核心的是Executors類,根據咱們是須要ExecutorService類型的線程池仍是ScheduledExecutorService類型的線程池調用相應的工廠方法就能夠了,而ExecutorService的實現表如今ThreadPoolExecutor上,ScheduledExecutorService的實現則表如今ScheduledThreadPoolExecutor上,下文將分別剖析這二者,嘗試弄清楚線程池的原理。學習

ThreadPoolExecutor解析

 上文中描述了Java中線程池相關的架構,瞭解了這些內容其實咱們就可使用java的線程池爲咱們工做了,使用其提供的線程池咱們能夠很方便的寫出高質量的多線程代碼,本節將分析ThreadPoolExecutor的實現,來探索線程池的運行原理。下面的圖片展現了ThreadPoolExecutor的類圖:

@ThreadPoolExecutor的類圖|center

private final BlockingQueue<Runnable> workQueue;  // 任務隊列,咱們的任務會添加到該隊列裏面,線程將從該隊列獲取任務來執行
 
private final HashSet<Worker> workers = new HashSet<Worker>();//全部工做線程的集合,來消費workQueue裏面的任務
  
private volatile ThreadFactory threadFactory;//線程工廠
      
private volatile RejectedExecutionHandler handler;//拒絕策略,默認會拋出異常,還要其餘幾種拒絕策略以下:
一、CallerRunsPolicy:在調用者線程裏面運行該任務
二、DiscardPolicy:丟棄任務
三、DiscardOldestPolicy:丟棄workQueue的頭部任務
private volatile int corePoolSize;//最下保活work數量
 
private volatile int maximumPoolSize;//work上限

咱們嘗試執行submit方法,下面是執行的關鍵路徑,總結起來就是:若是Worker數量還沒達到上限則繼續建立,不然提交任務到workQueue,而後讓worker來調度運行任務。

step 1: <ExecutorService>
Future<?> submit(Runnable task); 
 
step 2:<AbstractExecutorService>
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
 
step 3:<Executor>
void execute(Runnable command);

step 4:<ThreadPoolExecutor>
 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) { //提交咱們的任務到workQueue
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) //使用maximumPoolSize做爲邊界
        reject(command); //還不行?拒絕提交的任務
}
step 5:<ThreadPoolExecutor>
private boolean addWorker(Runnable firstTask, boolean core)
 
 
step 6:<ThreadPoolExecutor>
w = new Worker(firstTask); //包裝任務
final Thread t = w.thread; //獲取線程(包含任務)
workers.add(w);   // 任務被放到works中
t.start(); //執行任務

上面的流程是高度歸納的,實際狀況遠比這複雜得多,可是咱們關心的是怎麼打通整個流程,因此這樣分析問題是沒有太大的問題的。觀察上面的流程,咱們發現其實關鍵的地方在於Worker,若是弄明白它是如何工做的,那麼咱們也就大概明白了線程池是怎麼工做的了。下面分析一下Worker類。

@Worker類圖|center

上面的圖片展現了Worker的類關係圖,關鍵在於他實現了Runnable接口,因此問題的關鍵就在於run方法上。在這以前,咱們來看一下Worker類裏面的關鍵成員:

/** Thread this worker is running in.  Null if factory fails. */
final Thread thread;
/** Initial task to run.  Possibly null. */
Runnable firstTask; // 咱們提交的任務,可能被馬上執行,也可能被放到隊列裏面

thread是Worker的工做線程,上面的分析咱們也發現了在addWorker中會獲取worker裏面的thread而後start,也就是這個線程的執行,而Worker實現了Runnable接口,因此在構造thread的時候Worker將本身傳遞給了構造函數,thread.start執行的其實就是Worker的run方法。下面是run方法的內容:

/** Delegates main run loop to outer runWorker  */
public void run() {
     runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
}

咱們來分析一下runWorker這個方法,這就是整個線程池的核心。首先獲取到了咱們剛提交的任務firstTask,而後會循環從workQueue裏面獲取任務來執行,獲取任務的方法以下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
    }
}

其實核心也就一句:
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

 咱們再回頭看一下execute,其實咱們上面只走了一條邏輯,在execute的時候,咱們的worker的數量尚未到達咱們設定的corePoolSize的時候,會走上面咱們分析的邏輯,而若是達到了咱們設定的閾值以後,execute中會嘗試去提交任務,若是提交成功了就結束,不然會拒絕任務的提交。咱們上面還提到一個成員:maximumPoolSize,其實線程池的最大的Worker數量應該是maximumPoolSize,可是咱們上面的分析是corePoolSize,這是由於咱們的private boolean addWorker(Runnable firstTask, boolean core)的參數core的值來控制的,core爲true則使用corePoolSize來設定邊界,不然使用maximumPoolSize來設定邊界。直觀的解釋一下,當線程池裏面的Worker數量尚未到corePoolSize,那麼新添加的任務會伴隨着產生一個新的worker,若是Worker的數量達到了corePoolSize,那麼就將任務存放在阻塞隊列中等待Worker來獲取執行,若是沒有辦法再向阻塞隊列聽任務了,那麼這個時候maximumPoolSize就變得有用了,新的任務將會伴隨着產生一個新的Worker,若是線程池裏面的Worker已經達到了maximumPoolSize,那麼接下來提交的任務只能被拒絕策略拒絕了。能夠參考下面的描述來理解:

* When a new task is submitted in method {@link #execute(Runnable)},
 * and fewer than corePoolSize threads are running, a new thread is
 * created to handle the request, even if other worker threads are
 * idle.  If there are more than corePoolSize but less than
 * maximumPoolSize threads running, a new thread will be created only
 * if the queue is full.  By setting corePoolSize and maximumPoolSize
 * the same, you create a fixed-size thread pool. By setting
 * maximumPoolSize to an essentially unbounded value such as {@code
 * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
 * number of concurrent tasks. Most typically, core and maximum pool
 * sizes are set only upon construction, but they may also be changed
 * dynamically using {@link #setCorePoolSize} and {@link
 * #setMaximumPoolSize}.

在此須要說明一點,有一個重要的成員:keepAliveTime,當線程池裏面的線程數量超過corePoolSize了,那麼超出的線程將會在空閒keepAliveTime以後被terminated。能夠參考下面的文檔:

* If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).

ScheduledThreadPoolExecutor解析

ScheduledThreadPoolExecutor適用於延時執行,或者週期性執行的任務調度,ScheduledThreadPoolExecutor在實現上繼承了ThreadPoolExecutor,因此你依然能夠將ScheduledThreadPoolExecutor當成ThreadPoolExecutor來使用,可是ScheduledThreadPoolExecutor的功能要強大得多,由於ScheduledThreadPoolExecutor能夠根據設定的參數來週期性調度運行,下面的圖片展現了四個和週期性相關的方法:

@四個Scheduled方法|center

  • 若是你想延時一段時間而後運行一個Callable,那麼使用的第一個方法
  • 若是你想延時一段時間以後運行一個Runnable,那麼使用第二個方法;
  • 若是你想要延時一段時間,而後根據設定的參數週期執行Runnable,那麼能夠選擇第三個和第四個方法,第三個方法和第四個方法的區別在於:第三個方法嚴格按照規劃的時間路徑來執行,好比周期爲2,延時爲0,那麼執行的序列爲0,2,4,6,8....,而第四個方法將基於上次執行時間來規劃下次的執行,也就是在上次執行完成以後再次執行。好比上面的執行序列0,2,4,6,8...,若是第2秒沒有被調度執行,而在第三秒的時候才被調度,那麼下次執行的時間不是4,而是5,以此類推。

下面來看一下這四個方法的一些細節:

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
}


public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
}


public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
}

經過上面的代碼咱們能夠發現,前兩個方法是相似的,後兩個方法也是相似的。前兩個方法屬於一次性調度,因此period都爲0,區別在於參數不一樣,一個是Runnable,而一個是Callable,好笑的是,最後都變爲了Callable了,見下面的構造函數:

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

對於後兩個方法,區別僅僅在於period的,scheduleWithFixedDelay對參數進行了操做,將原來的時間變爲負數了,然後面在計算下次被調度的時間的時候會根據這個參數的正負值來分別處理,正數表明scheduleAtFixedRate,而負數表明了scheduleWithFixedDelay。

一個須要被咱們注意的細節是,以上四個方法最後都會調用一個方法: delayedExecute(t),下面看一下這個方法:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
}

大概的意思就是先判斷線程池是否被關閉了,若是被關閉了,則拒絕任務的提交,不然將任務加入到任務隊列中去等待被調度執行。最後的ensurePrestart的意思是須要確保線程池已經被啓動起來了。下面是這個方法:

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

主要是增長了一個沒有任務的worker,有什麼用呢?咱們還記得Worker的邏輯嗎?addWorker方法的執行,會觸發Worker的run方法的執行,而後runWorker方法就會被執行,而runWorker方法是循環從workQueue中取任務執行的,因此確保線程池被啓動起來是重要的,而只須要簡單的執行addWorker便會觸發線程池的啓動流程。對於調度線程池來講,只要執行了addWorker方法,那麼線程池就會一直在後臺週期性的調度執行任務。

到此,彷佛咱們仍是沒有鬧明白ScheduledThreadPoolExecutor是如何實現週期性的,上面講到四個scheduled方法時,咱們沒有提一個重要的類:ScheduledFutureTask,對,全部神奇的事情將會發生在這個類中,下面來分析一下這個類。

@ScheduledFutureTask類圖|center

看上面的類圖,貌似這個類很是複雜,還好,咱們發現他實現了Runnable接口,那麼必然會有一個run方法,而這個run方法必然是整個類的核心,下面來看一下這個run方法的內容:

public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

首先,判斷是不是週期性的任務,若是不是,則直接執行(一次性),不然執行後,而後設置下次執行的時間,而後從新調度,等待下次執行。這裏有一個方法須要注意,也就是setNextRunTime,上面咱們提到scheduleAtFixedRate和scheduleWithFixedDelay在傳遞參數時不同,後者將delay值變爲了負數,因此下面的處理正好印證了前文所述。

/**
         * Sets the next time to run for a periodic task.
         */
        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }

下面來看一下reExecutePeriodic方法是如何作的,他的目標是將任務再次被調度執行,下面的代碼展現了這個功能的實現:

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

能夠看到,這個方法就是將咱們的任務再次放到了workQueue裏面,那這個參數是什麼?在上面的run方法中咱們調用了reExecutePeriodic方法,參數爲outerTask,而這個變量是什麼?看下面的代碼:

/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;

這個變量指向了本身,而this的類型是什麼?是ScheduledFutureTask,也就是能夠被調度的task,這樣就實現了循環執行任務了。

上面的分析已經到了循環執行,可是ScheduledThreadPoolExecutor的功能是週期性執行,因此咱們接着分析ScheduledThreadPoolExecutor是如何根據咱們的參數走走停停的。這個時候,是應該看一下ScheduledThreadPoolExecutor的構造函數了,咱們來看一個最簡單的構造函數:

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

咱們知道ScheduledThreadPoolExecutor的父類是ThreadPoolExecutor,因此這裏的super實際上是ThreadPoolExecutor的構造函數,咱們發現其中有一個參數DelayedWorkQueue,看名字貌似是一個延遲隊列的樣子,進一步跟蹤代碼,發現了下面的一行代碼(構造函數中):
this.workQueue = workQueue;

因此在ScheduledThreadPoolExecutor中,workQueue是一個DelayedWorkQueue類型的隊列,咱們暫且認爲DelayedWorkQueue是一種具有延遲功能的隊列吧,那麼,到此咱們即可以想明白了,上面的分析咱們明白了ScheduledThreadPoolExecutor是如何循環執行任務的,而這裏咱們明白了ScheduledThreadPoolExecutor使用DelayedWorkQueue來達到延遲的目標,因此組合起來,就能夠實現ScheduledThreadPoolExecutor週期性執行的目標。下面咱們來看一下DelayedWorkQueue是如何作到延遲的吧,上文中提到一個方法:getTask,這個方法的做用是從workQueue中取出任務來執行,而在ScheduledThreadPoolExecutor裏面,getTask方法是從DelayedWorkQueue中取任務的,而取任務無非兩個方法:poll或者take,下面咱們對DelayedWorkQueue的take方法來分析一下:

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

在for循環裏面,首先從queue中獲取第一個任務,而後從任務中取出延遲時間,然後使用available變量來實現延遲效果。這裏面須要幾個點須要探索一下:

  • 這個queue是什麼東西?
  • 延遲時間的前因後果?
  • available變量的前因後果?

對於第一個問題,看下面的代碼:
`
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];`

它是一個RunnableScheduledFuture類型的數組,下面是RunnableScheduledFuture類的類關係圖:

@RunnableScheduledFuture類關係|center

數組裏面保存了咱們的RunnableScheduledFuture,對queue的操做,主要來看一下增長元素和消費元素的操做。首先,假設使用add方法來增長RunnableScheduledFuture到queue,調用的鏈路以下:

public boolean add(Runnable e) {
            return offer(e);
        }

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

解釋一下,add方法直接轉到了offer方法,該方法中,首先判斷數組的容量是否足夠,若是不夠則grow,增加的策略以下:
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
每次增加50%。增加完成後,若是這是第一個元素,則放在座標爲0的位置,不然,使用siftUp操做,下面是該方法的內容:

private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

這個數組實現了堆這種數據結構,使用對象比較將最須要被調度執行的RunnableScheduledFuture放到數組的前面,而這得力於compareTo方法,下面是RunnableScheduledFuture類的compareTo方法的實現,主要是經過延遲時間來作比較。

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

上面是生產元素,下面來看一下消費數據。在上面咱們提到的take方法中,使用了一個方法以下:

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}

這個方法中調用了一個方法siftDown,這個方法以下:

private void siftDown(int k, RunnableScheduledFuture<?> key) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        int right = child + 1;
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo(c) <= 0)
            break;
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

對其的解釋就是:
Replaces first element with last and sifts it down. Call only when holding lock.

總結一下,當咱們向queue插入任務的時候,會發生siftUp方法的執行,這個時候會把任務儘可能往根部移動,而當咱們完成任務調度以後,會發生siftDown方法的執行,與siftUp相反,siftDown方法會將任務儘可能移動到queue的末尾。總之,大概的意思就是queue經過compareTo實現了相似於優先級隊列的功能。

下面咱們來看一下第二個問題:延遲時間的前因後果。在上面的take方法裏面,首先獲取了delay,而後再使用available來作延遲效果,那這個delay從哪裏來的呢?經過上面的類圖RunnableScheduledFuture的類圖咱們知道,RunnableScheduledFuture類實現了Delayed接口,而Delayed接口裏面的惟一方法是getDelay,咱們到RunnableScheduledFuture裏面看一下這個方法的具體實現:

public long getDelay(TimeUnit unit) {
     return unit.convert(time - now(), NANOSECONDS);
 }

time是咱們設定的下次執行的時間,因此延遲就是(time - now()),沒毛病!

第三個問題:available變量的前因後果,至於這個問題,咱們看下面的代碼:

/**
 * Condition signalled when a newer task becomes available at the
 * head of the queue or a new thread may need to become leader.
 */
private final Condition available = lock.newCondition();

這是一個條件變量,take方法裏面使用這個變量來作延遲效果。Condition能夠在多個線程間作同步協調工做,更爲具體細緻的關於Condition的內容,能夠參考更多的資料來學習,本文對此知識點點到爲止。

到此爲止,咱們梳理了ScheduledThreadPoolExecutor是如何實現週期性調度的,首先分析了它的循環性,而後分析了它的延遲效果。

本文到此也就結束了,對於線程池的學習如今纔剛剛起步,須要更多更專業的知識類幫我理解更爲底層的內容,固然,爲了更進一步理解線程池的實現細節,首先須要對線程間通訊有足夠的把握,其次是要對各類數據結構有清晰的認識,好比隊列、優先級隊列、堆等高級的數據結構,以及java語言對於這些數據結構的實現,更爲重要的是要結合實際狀況分析問題,在工做和平時的學習中不斷總結,不斷迭代對於線程、線程池的認知。

相關文章
相關標籤/搜索