併發編程之定時任務

ScheduledThreadPoolExecutor

  ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor 實現了ScheduledExecutorService。主要用來處理延時任務和定時任務。java

 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

定時線程池的執行原理與通常的線程池執行過程有點差異,具體的定時線程的執行原理以下圖所示:算法

 定時線程池主要是接收ScheduledFutureTask任務,是線程池調度任務的最小單位,有3種提交方式:數組

1. schedule:schedule方法是指任務在指定延遲時間後觸發,只執行一次。
2. scheduledAtFixedRate:
3. scheduledWithFixedDelay:
具體的案例以下:
package com.test.executor;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class SchelduledThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledService=Executors.newScheduledThreadPool(5);
scheduledService.schedule(
new Runnable() { @Override public void run() { System.out.println("延時任務執行"); } }, 1, TimeUnit.SECONDS); scheduledService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // TODO Auto-generated method stub System.out.println("無論任務是否執行完成,每過3秒產生一個新線程"); } }, 1, 3, TimeUnit.SECONDS); scheduledService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { System.out.println("前一個任務執行完成之後,隔3秒執行下一個任務"); } }, 1, 3, TimeUnit.SECONDS); } }

定時線程池採用的是DelayQueue,是個無界隊列,內部封裝了priorityQueue,根據time時間前後進行排序,若time相同則用sequenceNumber排序。緩存

ScheduledFutureTask 有如下三種屬性:網絡

一、private final long sequenceNumber;任務序號
二、private final long period;任務執行時間間隔數據結構

三、private long time;任務開始時間多線程

工做線程的執行過程:ide

一、工做線程從DelayQueue從獲取到期的任務去執行;this

二、執行結束後從新設置任務的到期時間,再次放回到DelayQueue中去;spa

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }


 private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())//若是線程池已經關閉
            reject(task);//拒絕任務
        else {
            super.getQueue().add(task);//不然直接加入隊列
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);//若當前線程沒法執行,則取消
            else
                ensurePrestart();//增長一個worker,避免提交的任務沒有線程去執行,緣由就是該類沒有像ThreadPoolExecutor同樣,woker滿了才放入隊列
        }
    }
void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

ScheduledThreadPoolExecutor會把執行的任務放到DelayQueue中去,DelayQueue中封裝了一個PriorityQueue隊列,該隊列會對任務ScheduledFutureTask按照時間順序進行排序,排序算法以下:

public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0)? 0 : ((d < 0)? -1 : 1);
        }

由以上代碼能夠看出,

一、先按照time進行排序,時間小的排在前面,時間大的排在後面;

二、time相同的話,就按照sequenceNumber來進行排序,sequenceNumber小的排在前面,大的排在後面。即時間相同,先提交的優先執行。

定時線程池任務運行的核心就是ScheduledFutureTask的run方法,下面來看一下:

 public void run() {
            boolean periodic = isPeriodic();
        //若是當前線程已經不支持執行任務,則取消
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
    //不須要週期性執行任務,則直接執行run而後結束任務
            else if (!periodic)
                ScheduledFutureTask.super.run();
    //若是須要週期性執行任務,則執行完任務之後,設置下一次執行時間,而後重複執行
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();//設置下一次執行時間
                reExecutePeriodic(outerTask);//重複執行任務
            }
        }
總結一下,具體的執行步驟以下:
1. 若是當前線程池運行狀態不能夠執行任務,取消該任務,而後直接返回,不然執行
步驟2;
2. 若是不是週期性任務,調用FutureTask中的run方法執行,會設置執行結果,而後
直接返回,不然執行步驟3;
3. 若是是週期性任務,調用FutureTask中的runAndReset方法執行,不會設置執行
結果,而後直接返回,不然執行步驟4和步驟5;
4. 計算下次執行該任務的具體時間;
5. 重複執行任務。
 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

reExecutePeriodic方法與delayedExecute方法相似,可是不一樣的是:

一、因爲調用reExecutePeriodic方法的時候,任務已經執行過一次了,因此不會拒絕當前任務;

二、傳入的任務必定是週期性執行的任務。

DelayedWorkQueue

DelayedWorkQueue是一個基於堆數據結構的無界隊列。在執行任務的時候每一個任務的執行時間都不同,因此它的工做就是按照時間升序排列,執行時間距離當前時間越近則越排在隊列前。可是這裏的順序並非絕對的,由於堆中排序只是保證了子節點的任務執行時間要比父節點的下次執行時間要大,各個子節點之間並非順序排列的。

堆的數據結構以下:

 

 堆結構能夠轉換成數組,以下:

{1,3,4,8,10,15,20}

假設,索引值從0開始,子節點的索引值爲k,父節點的索引值爲p,則:
1. 一個節點的左子節點的索引爲:k = p * 2 + 1;
2. 一個節點的右子節點的索引爲:k = (p + 1) * 2;
3. 一個節點的父節點的索引爲:p = (k - 1) / 2。

爲何使用DelayedWorkQueue?

 由於定時任務須要取出最近須要執行的任務,因此任務隊列當中每次出隊的必定是隊列當中最靠前的任務,因此要用優先級隊列。而DelayedWorkQueue就是一個優先級隊列,能夠保證每次出隊的任務都是當前隊列當中最靠前的任務,並且該隊列仍是堆結構的,在執行插入和刪除的時候複雜度爲O(logN).
 

DelayedWorkQueue屬性

private static final int INITIAL_CAPACITY = 16;隊列的初始容量
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];根據初始容量建立的RunnableScheduledFuture數組
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;

private Thread leader = null;leader線程

 private final Condition available = lock.newCondition();當較新的任務在隊列的頭部可用時,或者說有新的線程可能須要成爲leader的時候,經過這個條件發出信號。

對於多線程網絡模型來講,線程有三種身份,leader,follower,proccesser。基準就是永遠最多隻有一個leader,全部的follower都在等待成爲leader。線程池啓動會自動產生一個leader,負責等待網絡IO時間,當有一個事件產生的時候leader首先通知一個follower成爲leader,而後本身就去處理這個網絡事件,完畢之後本身加入follower線程等待隊列當中去,等待下一次成爲leader。這樣的話能夠加強CPU的高速緩存性,消除動態內存分配以及線程間的數據交換。

 Offer方法

public boolean offer(Runnable x) {
//參數校驗
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
//查看當前元素數量,若是大於隊列長度則進行擴容
        int i = size;
        if (i >= queue.length)
            grow();
//元素數量加1
        size = i + 1;
//若是當前隊列尚未元素,則直接加入頭部
        if (i == 0) {
            queue[0] = e;
//記錄索引
            setIndex(e, 0);
        } else {
  //把任務加入堆中,並調整堆結構,這裏就會根據任務的觸發時間排列
             //把須要最先執行的任務放在前面
            siftUp(i, e);
        }
//若是新加入的元素就是隊列頭,這裏有兩種狀況
        //1.這是用戶提交的第一個任務
        //2.新任務進行堆調整之後,排在隊列頭
        if (queue[0] == e) {
// leader設置爲null爲了使在take方法中的線程在經過available.signal();後會執行
available.awaitNanos(delay);
            leader null;
//加入元素之後,喚醒worker線程
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

來看一下siftUp()方法:

private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
   // 獲取父節點
        int parent = (k ­ 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
   // 若是key節點的執行時間大於父節點的執行時間,不須要再排序了
        if (key.compareTo(e) >= 0)
            break;
  // 若是key.compareTo(e) < 0,說明key節點的執行時間小於父節點的執行時間,須要把父節點移到後面
        queue[k] = e;
        setIndex(e, k);
// 設置索引爲k
        k = parent;
    }
// key設置爲排序後的位置中
    queue[k] = key;
    setIndex(key, k);
}

簡言之就是循環的根據key節點與他的父節點進行比較,key節點的時間小於父節點,則交換位置,將時間點靠前的排在前面。

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

在getTask的時候使用take方法,該方法是保證當前對列取出的就是最新須要執行的任務。保證了任務只有在指定的執行時間的時候才能夠被取走。

相關文章
相關標籤/搜索