深刻JDK源碼之定時操做Timer類和TimerTask類實現

Timer類是一種線程設施,能夠用來實現某一個時間或某一段時間後安排某一個任務執行一次或按期重複執行。該功能和TimerTask配合使用。TimerTask類用於實現由Timer安排的一次或重複執行的某個任務。每個Timer對象對應的是一個線程,所以計時器所執行的任務應該迅速完成,不然會延遲後續的任務執行。 ##JDK源碼TimerTask類## 這個類是個抽象類比較簡單,有四個常量表示定時器任務的狀態,還有一個Object類型lock對象,至關一把鎖,控制線程對定時器任務狀態的同步訪問。 nextExecutionTime:這個成員變量用到記錄該任務下次執行時間, 其格式和System.currentTimeMillis()一致。 這個值是做爲任務隊列中任務排序的依據. 任務調試者執行每一個任務前會對這個值做處理,從新計算下一次任務執行時間,併爲這個變量賦值.。 period:用來描述任務的執行方式: 0表示不重複執行的任務. 正數表示固定速率執行的任務. 負數表示固定延遲執行的任務。(固定速率: 不考慮該任務上一次執行狀況,始終從開始時間算起的每period執行下一次. 固定延遲: 考慮該任務一次執行狀況,在上一次執行後period執行下一次)。java

public abstract class TimerTask implements Runnable {
    //這個對象是用來控制訪問TimerTask內部構件。鎖機制
    final Object lock = new Object();
    //定時器任務的狀態
    int state = VIRGIN;
    //定時器任務默認的狀態,表示尚未被安排
    static final int VIRGIN = 0;
    //表示定時器任務被安排了
    static final int SCHEDULED   = 1;
    //表示定時器任務執行
    static final int EXECUTED    = 2;
    //表示定時器任務取消
    static final int CANCELLED   = 3;
    //下次執行任務時間
    long nextExecutionTime;
    long period = 0;

    protected TimerTask() {
    }
    // 此計時器任務要執行的操做。
    public abstract void run();

    // 取消此計時器任務。
    public boolean cancel() {
        synchronized(lock) {
            boolean result = (state == SCHEDULED);
            state = CANCELLED;
            return result;
        }
    }
    // 返回此任務最近實際 執行的已安排 執行時間。
    public long scheduledExecutionTime() {
        synchronized(lock) {
            return (period < 0 ? nextExecutionTime + period
                            : nextExecutionTime - period);
        }
    }
}

##深刻JDK源碼之Timer類## Timer中最主要由三個部分組成: 任務 TimerTask 、 任務隊列: TaskQueue queue 和 任務調試者:TimerThread thread。 ###任務隊列TaskQueue,它是Timer的一個內部類### 事實上任務隊列是一個數組, 採用平衡二叉堆來實現他的優先級調度, 而且是一個小頂堆. 須要注意的是, 這個堆中queue[n] 的孩子是queue[2n] 和 queue[2n+1]。api

任務隊列的優先級按照TimerTask類的成員變量nextExecutionTime值來排序(注意, 這裏的任務指的是那些交由定時器來執行的, 繼承TimerTask的對象).數組

在任務隊列中, nextExecutionTime最小就是全部任務中最先要被調度來執行的, 因此被安排在queue[1] (假設任務隊列非空).oop

對於堆中任意一個節點n, 和他的任意子孫節點d,必定遵循: n.nextExecutionTime <= d.nextExecutionTime.測試

// 任務隊列
class TaskQueue {
    // 計時器任務數組,默認大小爲128
    private TimerTask[] queue = new TimerTask[128];

    private int size = 0;

    int size() {
        return size;
    }

    // 加入隊列
    void add(TimerTask task) {
        // Grow backing store if necessary
        if (size + 1 == queue.length)
            // 隊列以兩倍的速度擴容
            queue = Arrays.copyOf(queue, 2 * queue.length);

        queue[++size] = task;
        fixUp(size);
    }

    // 獲取隊列的元素,即第一個任務,第一個元素存儲的是
    TimerTask getMin() {
        return queue[1];
    }

    TimerTask get(int i) {
        return queue[i];
    }

    // 消除頭任務從優先隊列。
    void removeMin() {
        queue[1] = queue[size];
        queue[size--] = null; // Drop extra reference to prevent memory leak
        fixDown(1);
    }

    /**
     * Removes the ith element from queue without regard for maintaining the
     * heap invariant. Recall that queue is one-based, so 1 <= i <= size.
     */
    void quickRemove(int i) {
        // 斷言,在這裏只起測試做用
        assert i <= size;

        queue[i] = queue[size];
        queue[size--] = null; // Drop extra ref to prevent memory leak
    }

    /**
     * Sets the nextExecutionTime associated with the head task to the specified
     * value, and adjusts priority queue accordingly.
     */
    void rescheduleMin(long newTime) {
        queue[1].nextExecutionTime = newTime;
        fixDown(1);
    }

    /**
     * Returns true if the priority queue contains no elements.
     */
    boolean isEmpty() {
        return size == 0;
    }

    /**
     * Removes all elements from the priority queue.
     */
    void clear() {
        // Null out task references to prevent memory leak
        for (int i = 1; i <= size; i++)
            queue[i] = null;
        size = 0;
    }

    // 進行隊列中任務優先級調整. fixUp方法的做用是儘可能將隊列中指定位置(k)的任務向隊列前面移動,
    // 即提升它的優先級. 由於新加入的方法頗有可能比已經在任務隊列中的其它任務要更早執行.
    private void fixUp(int k) {
        while (k > 1) {
            int j = k >> 1;// 左移一位,至關於除以2
            if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];
            queue[j] = queue[k];
            queue[k] = tmp;
            k = j;
        }
    }

    // 從任務隊列中移除一個任務的過程, 首先直接將當前任務隊列中最後一個任務賦給queue[1],
    // 而後將隊列中任務數量--, 最後和上面相似, 可是這裏是調用fixDown(int k)方法了, 儘可能將k位置的任務向隊列後面移動.
    private void fixDown(int k) {
        int j;
        while ((j = k << 1) <= size && j > 0) {
            if (j < size && queue[j].nextExecutionTime > queue[j + 1].nextExecutionTime)
                j++; // j indexes smallest kid
            if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];
            queue[j] = queue[k];
            queue[k] = tmp;
            k = j;
        }
    }

    /**
     * Establishes the heap invariant (described above) in the entire tree,
     * assuming nothing about the order of the elements prior to the call.
     */
    void heapify() {
        for (int i = size / 2; i >= 1; i--)
            fixDown(i);
    }
}

###任務調度 TimerThread###ui

// 計時器線程
class TimerThread extends Thread {

	// 新任務是否被安排
	boolean newTasksMayBeScheduled = true;

	// 任務隊列
	private TaskQueue queue;

	TimerThread(TaskQueue queue) {
		this.queue = queue;
	}

	public void run() {
		try {
			mainLoop();
		} finally {
			// Someone killed this Thread, behave as if Timer cancelled
			synchronized (queue) {
				newTasksMayBeScheduled = false;
				queue.clear(); // Eliminate obsolete references
			}
		}
	}

	private void mainLoop() {
		while (true) {
			try {
				TimerTask task;
				boolean taskFired;
				synchronized (queue) {
					// Wait for queue to become non-empty
					while (queue.isEmpty() && newTasksMayBeScheduled)
						queue.wait();
					if (queue.isEmpty())
						break; // Queue is empty and will forever remain; die

					// Queue nonempty; look at first evt and do the right thing
					long currentTime, executionTime;
					task = queue.getMin();
					synchronized (task.lock) {
						if (task.state == TimerTask.CANCELLED) {
							queue.removeMin();
							continue; // No action required, poll queue again
						}
						currentTime = System.currentTimeMillis();
						executionTime = task.nextExecutionTime;
						if (taskFired = (executionTime <= currentTime)) {
							if (task.period == 0) { // Non-repeating, remove
								queue.removeMin();
								task.state = TimerTask.EXECUTED;
							} else { // Repeating task, reschedule
								queue
										.rescheduleMin(task.period < 0 ? currentTime
												- task.period
												: executionTime + task.period);
							}
						}
					}
					if (!taskFired) // Task hasn't yet fired; wait
						queue.wait(executionTime - currentTime);
				}
				if (taskFired) // Task fired; run it, holding no locks
					task.run();
			} catch (InterruptedException e) {
			}
		}
	}
}

###Timer類的主體和主要對外提供的方法###this

import java.util.*;
import java.util.Date;

public class Timer {
	// 定時任務隊列
	private TaskQueue queue = new TaskQueue();

	// 計時器線程
	private TimerThread thread = new TimerThread(queue);

	private Object threadReaper = new Object() {
		protected void finalize() throws Throwable {
			synchronized (queue) {
				thread.newTasksMayBeScheduled = false;
				queue.notify(); // In case queue is empty.
			}
		}
	};

	// ID號做爲線程的ID
	private static int nextSerialNumber = 0;

	private static synchronized int serialNumber() {
		return nextSerialNumber++;
	}

	public Timer() {
		this("Timer-" + serialNumber());
	}

	// 建立一個新計時器,能夠指定其相關的線程做爲守護程序運行。
	public Timer(boolean isDaemon) {
		this("Timer-" + serialNumber(), isDaemon);
	}

	public Timer(String name) {
		thread.setName(name);
		thread.start();
	}

	// 建立一個新計時器,其相關的線程具備指定的名稱,而且能夠指定做爲守護程序運行。
	public Timer(String name, boolean isDaemon) {
		thread.setName(name);
		thread.setDaemon(isDaemon);
		thread.start();
	}

	// 安排在指定延遲後執行指定的任務。時間單位毫秒
	public void schedule(TimerTask task, long delay) {
		if (delay < 0)
			throw new IllegalArgumentException("Negative delay.");
		sched(task, System.currentTimeMillis() + delay, 0);
	}

	// 安排在指定的時間執行指定的任務。
	public void schedule(TimerTask task, Date time) {
		sched(task, time.getTime(), 0);
	}

	// 安排指定的任務從指定的延遲後開始進行重複的固定延遲執行。
	public void schedule(TimerTask task, long delay, long period) {
		if (delay < 0)
			throw new IllegalArgumentException("Negative delay.");
		if (period <= 0)
			throw new IllegalArgumentException("Non-positive period.");
		sched(task, System.currentTimeMillis() + delay, -period);
	}

	// 安排指定的任務在指定的時間開始進行重複的固定延遲執行。
	public void schedule(TimerTask task, Date firstTime, long period) {
		if (period <= 0)
			throw new IllegalArgumentException("Non-positive period.");
		sched(task, firstTime.getTime(), -period);
	}

	// 安排指定的任務在指定的延遲後開始進行重複的固定速率執行。
	public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
		if (delay < 0)
			throw new IllegalArgumentException("Negative delay.");
		if (period <= 0)
			throw new IllegalArgumentException("Non-positive period.");
		sched(task, System.currentTimeMillis() + delay, period);
	}

	// 安排指定的任務在指定的時間開始進行重複的固定速率執行。
	public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) {
		if (period <= 0)
			throw new IllegalArgumentException("Non-positive period.");
		sched(task, firstTime.getTime(), period);
	}
	private void sched(TimerTask task, long time, long period) {
		if (time < 0)
			throw new IllegalArgumentException("Illegal execution time.");
		// 同步代碼塊 ,對queue的訪問須要同步
		synchronized (queue) {
			if (!thread.newTasksMayBeScheduled)
				throw new IllegalStateException("Timer already cancelled.");
			// 同步代碼塊,須要得到task的lock,鎖
			synchronized (task.lock) {
				if (task.state != TimerTask.VIRGIN)
					throw new IllegalStateException(
							"Task already scheduled or cancelled");
				// 任務接下來執行的時刻
				task.nextExecutionTime = time;
				// 任務執行時間間隔週期
				task.period = period;
				// 任務已經安排,等待執行
				task.state = TimerTask.SCHEDULED;
			}
			// 加入計時器等待任務隊列
			queue.add(task);
			//
			if (queue.getMin() == task)
				// 喚醒在此對象監視器上等待的單個線程。
				queue.notify();
		}
	}

	// 終止此計時器,丟棄全部當前已安排的任務。
	public void cancel() {
		synchronized (queue) {
			thread.newTasksMayBeScheduled = false;
			queue.clear();
			queue.notify(); // In case queue was already empty.
		}
	}

	// 今後計時器的任務隊列中移除全部已取消的任務。
	public int purge() {
		int result = 0;

		synchronized (queue) {
			for (int i = queue.size(); i > 0; i--) {
				if (queue.get(i).state == TimerTask.CANCELLED) {
					queue.quickRemove(i);
					result++;
				}
			}

			if (result != 0)
				queue.heapify();
		}

		return result;
	}
}
相關文章
相關標籤/搜索