Java定時任務Timer調度器【二】 多線程源碼分析(圖文版)

 

上一節經過一個小例子分析了Timer運行過程,牽涉的執行線程雖然只有兩個,但實際場景會比上面複雜一些。java

首先經過一張簡單類圖(只列出簡單的依賴關係)看一下Timer暴露的接口。算法

 

爲了演示Timer所暴露的接口,下面舉一個極端的例子(每個接口方法面向單獨的執行線程),照樣以鬧鐘爲例(源碼只列出關鍵部分,下同)。api

public class ScheduleDemo {

    public static void main(String[] args) throws Exception {
        AlarmTask alarm1 = new AlarmTask("鬧鐘1");
        AlarmTask alarm2 = new AlarmTask("鬧鐘2");
        new Thread("線程1"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]調度鬧鐘1");
                timer.schedule(alarm1,delay,period);
            }
        }.start();
        new Thread("線程2"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]調度鬧鐘2");
                timer.schedule(alarm2,delay,period);
            }
        }.start();
        Thread.sleep(1500);
        new Thread("線程3"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]取消鬧鐘2");
                alarm2.cancel();
            }
        }.start();
        new Thread("線程4"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]清理無用鬧鐘");
                timer.purge();
            }
        }.start();
        new Thread("線程5"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]關閉全部鬧鐘");
                timer.cancel();
            }
        }.start();
    }

    /**
     *     模擬鬧鐘
     */
    static class AlarmTask extends TimerTask{
        String name ;
        public AlarmTask(String name){
            this.name=name;
        }
        public void run() {
            log.info("["+Thread.currentThread().getName()+"]-["+name+"]嘀。。。");
            Thread.sleep(1000); //模擬鬧鐘執行時間
        }
    }
}

執行結果數組

[線程2]調度鬧鐘2
[線程1]調度鬧鐘1
[Timer-0]-[鬧鐘2]嘀。。。
[線程3]取消鬧鐘2
[線程4]清理無用鬧鐘
[線程5]關閉全部鬧鐘

下面咱們依次查看一下每一個接口方法的源碼。安全

1. 查看Timer.sched()源碼多線程

public void schedule(TimerTask task, long delay, long period) {
    sched(task, System.currentTimeMillis()+delay, -period);
}

private void sched(TimerTask task, long time, long period) {  
    // 若是period無限大,保證其在一個合理的範圍內
    if (Math.abs(period) > (Long.MAX_VALUE >> 1))
        period >>= 1;
    // 加queue鎖,保證隊列操做的線程安全
    synchronized(queue) {
        // 加lock鎖,保證任務狀態的一致性(多線程環境下)
        synchronized(task.lock) {
            task.nextExecutionTime = time;
            task.period = period;
            task.state = TimerTask.SCHEDULED;
        }
        // 將任務加入隊列實現排序
        queue.add(task);
        if (queue.getMin() == task)
            queue.notify();
    }
}

其中queue.add(task在)將任務加入隊列的同時實現了內部排序。工具

void add(TimerTask task) {
    // 隊列不足時,以兩倍容量擴增
    if (size + 1 == queue.length)
        // 從性能上要快於new一個數組的效率
        queue = Arrays.copyOf(queue, 2 * queue.length);
    queue[++size] = task;
    // 利用二分查找算法實現任務排序
    fixUp(size);
}

private void fixUp(int k) {
    while (k > 1) {
        int j = k >> 1;
        if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
            break;
        TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
        k = j;
    }
}

從方法sched()能夠看到,該方法一方面持有queue鎖,用來維護隊列排序的線程安全;一方面持有lock鎖,用來維護任務狀態的線程安全。oop

2. 查看TimerTask.cancel()源碼性能

public abstract class TimerTask implements Runnable {
  
    final Object lock = new Object();

    public boolean cancel() {
        synchronized(lock) {
            boolean result = (state == SCHEDULED);
            state = CANCELLED;
            return result;
        }
    }

對於任務的取消操做,只是簡單的修改一下任務狀態,中途也只佔有一個lock鎖!接着看一下執行任務的線程邏輯。ui

class TimerThread extends Thread {
   
    private TaskQueue queue;

    public void run() {
       mainLoop();
    }

    private void mainLoop() {
        while (true) {
                synchronized(queue) {
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    task = queue.getMin();
                    // 此處加task鎖,防止其餘線程同時調用task.cancel()
                    synchronized(task.lock) {
                      // ...維護鬧鐘狀態
                    }
                  }
                  if (!taskFired) // 時間未到
                    queue.wait(executionTime - currentTime);
                }
                if (taskFired)
                    // 執行鬧鐘時,沒有保持任何鎖
                    task.run();
        }
    }

能夠看到當TimerThead真正執行鬧鐘時,是沒有持鎖的,因此當鬧鐘正在運行的時候AlarmTask.cancel()對其是不起做用的,換言之,只能取消下一次將要執行的鬧鐘。

3. 查看Timer.purge()源碼

public class Timer {
   
    private final TaskQueue queue = new TaskQueue();

    // 保證被取消的task能及時進行垃圾回收
    public int purge() {
        int result = 0;
        synchronized(queue) {
            for (int i = queue.size(); i > 0; i--) {
                if (queue.get(i).state == TimerTask.CANCELLED) {
                    queue.quickRemove(i);
                    result++;
                }
            }
            if (result != 0)
                // 從新整理隊列中有效的任務
                queue.heapify();
        }
        return result;
    }

進一步查看queue.quickRemove(i)和queue.heapify()。

class TaskQueue {
 
    void quickRemove(int i) {
        queue[i] = queue[size];
        queue[size--] = null;  //清除無效任務,防止內存泄漏
    }

    private void fixDown(int k) {
        int j;
        while ((j = k << 1) <= size && j > 0) {
            if (j < size &&
                    queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
                j++; // j indexes smallest kid
            if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }

    void heapify() {
        for (int i = size/2; i >= 1; i--)
            fixDown(i);
    }

能夠看到Timer.purge()在持有queue鎖時主要作兩件事

1.及時清除隊列中無效的鬧鐘防止內存泄漏。

2.從新規整隊列中鬧鐘。

4. 最後看一下Timer.cancel()源碼

public class Timer {
  
    private final TaskQueue queue = new TaskQueue();

    private final TimerThread thread = new TimerThread(queue);

    public void cancel() {
        synchronized(queue) {
            thread.newTasksMayBeScheduled = false;
            queue.clear();
          //防止隊列爲空的狀況下,TimerThead無限等待
            queue.notify();  
        }
    }

該方法在清除全部鬧鐘的同時,與TimerThread發生了一次線程通訊——喚醒TimerThread並讓其永久退出。

private void mainLoop() {
    while (true) {
            synchronized(queue) {
                while (queue.isEmpty() && newTasksMayBeScheduled)
                    queue.wait();
                if (queue.isEmpty())
                    break;  // TimerThread永久退出
                queue.wait(executionTime - currentTime);
            }
     }
}

以上是整個過程的靜態分析,如今捕捉一個線程快照進行動態分析。爲了dump一個特定時刻的線程快照,如今在Timer.sched()打一個斷點(注意斷點的方式與位置)。

以debug模式運行下面的例子。

public class ScheduleDemo {

    public static void main(String[] args) throws Exception {
        AlarmTask alarm1 = new AlarmTask("鬧鐘1");
        AlarmTask alarm2 = new AlarmTask("鬧鐘2");
        new Thread("線程1"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]調度鬧鐘1");
                timer.schedule(alarm1,delay,period);
            }
        }.start();
        new Thread("線程2"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]調度鬧鐘2");
                timer.schedule(alarm2,delay,period);
            }
        }.start();
        Thread.sleep(1500);
        new Thread("線程3"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]取消鬧鐘2");
                alarm2.cancel();
            }
        }.start();
        new Thread("線程4"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]清理無用鬧鐘");
                timer.purge();
            }
        }.start();
        new Thread("線程5"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]關閉全部鬧鐘");
                timer.cancel();
            }
        }.start();
    }

    /**
     *     模擬鬧鐘
     */
    static class AlarmTask extends TimerTask{
        String name ;
        public AlarmTask(String name){
            this.name=name;
        }
        public void run() {
            log.info("["+Thread.currentThread().getName()+"]-["+name+"]嘀。。。");
            Thread.sleep(1000); //模擬鬧鐘執行時間
        }
    }
}

下圖是visualVM工具dump出的線程快照(斷點處)

經過上面的快照能夠看到,當「線程1「(持有兩把鎖)處於RUNNABLE狀態時,」線程2「、「線程3」、「線程4」、「線程5」都處於BLOCKED狀態。須要注意的是,由於TimerThread的時間未到,暫時處於WATING狀態(等待喚醒)。

下面是一個簡單的形象圖

總結:Timer爲了保證線程安全,使用了大量的鎖機制,總體上對CPU的利用率不高。

相關文章
相關標籤/搜索