添加元素,先添加到數組末尾,而後上調整堆。數組
取對首元素,把最後一個元素放到0位置,而後下調整堆。多線程
移除中間元素,把最後一個元素放到中間位置,而後下調整堆,下調整堆沒動(已是最大的),就在上調整堆。下調整堆動了,就不須要上調整堆。異步
//也是一個線程池ThreadPoolExecutor。裏面的任務都是ScheduledFutureTask,隊列是DelayedWorkQueue。 //線程池都是異步的,只不過FutureTask的run方法會把結果放到FutureTak裏面去,而後經過這個返回的FutureTask獲取執行結果。 //普通的Runnbale若是能夠返回出去而且有返回值也能夠獲取結果。 //Periodic是週期任務。Delayed是一次性任務。 @SuppressWarnings("rawtypes") public class ScheduledThreadPoolExecutor1 extends ThreadPoolExecutor1 implements ScheduledExecutorService { //Shutdown以後,Periodic週期任務是否應該刪除或者取消,仍是繼續執行。true繼續,false不繼續刪除取消 private volatile boolean continueExistingPeriodicTasksAfterShutdown; //Shutdown以後,DelayedT一次性任務是否應該刪除或者取消,仍是繼續執行。true繼續,false不繼續刪除取消 private volatile boolean executeExistingDelayedTasksAfterShutdown = false; //ScheduledFutureTask的cancel()方法用到。FutureTask任務被取消後(FutureTask.cancel()調用),是否是應該從隊列刪除。 private volatile boolean removeOnCancel = false; //序列號中斷調度關係,進而保證綁定條目之間的FIFO順序。 private static final AtomicLong sequencer = new AtomicLong(); final long now() { return System.nanoTime(); } //給線程池設置參數 public ScheduledThreadPoolExecutor1(int corePoolSize) {//線程池的隊列是DelayedWorkQueue super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } //給線程池設置參數。線程池的隊列是DelayedWorkQueue public ScheduledThreadPoolExecutor1(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } //給線程池設置參數。線程池的隊列是DelayedWorkQueue public ScheduledThreadPoolExecutor1(int corePoolSize, RejectedExecutionHandler1 handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } //給線程池設置參數。線程池的隊列是DelayedWorkQueue public ScheduledThreadPoolExecutor1(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler1 handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); } //週期任務periodic=true,一次性任務periodic=false boolean canRunInCurrentRunState(boolean periodic) { //週期任務:RUNNING返回true,SHUTDOWN看continueExistingPeriodic,true就true false就fasle。不是RUNNING不是SHUTDOWN返回false //一次性任務:RUNNING返回true,SHUTDOWN看executeExistingDelayed,true就true false就false。不是RUNNING不是SHUTDOWN返回false return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown:executeExistingDelayedTasksAfterShutdown); } //延遲或週期性執行任務。 //與ThreadPoolExecutor不一樣,這裏直接把任務加入延遲隊列。沒有像ThreadPoolExecutor同樣,woker滿了才放入隊列 private void delayedExecute(RunnableScheduledFuture<?> task) {//ScheduledFutureTask if (isShutdown()) reject(task); else { //此時線程池的Worker線程尚未建立,由於任務是延遲執行,到時候在建立線程池線程。如今只是丟到隊列裏面去。 super.getQueue().add(task); //進到canRunInCurrentRunState說明ctl>=SHUTDOWN, //continueExistingPeriodic|executeExistingDelayed是true,不刪除任務, //繼續開線程執行或者已經有足夠的線程了丟到隊列裏面去讓線程去執行, //continueExistingPeriodic|executeExistingDelayed是false,刪除任務 //ctl>SHUTDOWN,刪除任務 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) //從隊列移除任務成功,FutureTask.cancel() task.cancel(false); else //Worker線程數量<corePoolSize新建一個線程,大於corePoolSize不新建線程直接返回。 //開了一個Worker線程(初始任務是null),這個線程返回。一直會開到corePoolSize個線程就不開了。 ensurePrestart(); } } //週期性任務, 把任務再次加到隊列,而且又開一個線程 void reExecutePeriodic(RunnableScheduledFuture<?> task) { //RUNNING返回true,SHUTDOWN看continueExistingPeriodic|continueExistingPeriodic。 //不是RUNNING不是SHUTDOWN返回false if (canRunInCurrentRunState(true)) { super.getQueue().add(task);//又加入到隊列 //continueExistingPeriodic是true不刪除任務,continueExistingPeriodic是false就刪除任務, //不是RUNNING不是SHUTDOWN刪除任務。 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart();//又開線程去執行隊列 } } //關閉線程池的回調 public void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) {//keepDelayed=keepPeriodic=false for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false);//隊列的每一個任務都取消.FutureTask.cancel() q.clear(); } else {//keepDelayed keepPeriodic有一個爲true for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>) e; //true||:週期任務,keepPeriodic=fasle,shutdown以後中止任務。一次性任務,keepDelaye=fasle,shutdown以後中止任務。 //:刪除取消任務。 //false||true:週期任務|一次性任務,shutdown以後繼續任務,可是FutureTask任務已經被cancel(true|fasle)取消了。 //:刪除取消任務。 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { if (q.remove(t)) t.cancel(false);//FutureTask.cancel() } //fasle||fasle:週期任務|一次性任務,shutdown以後繼續任務,而且FutureTask任務沒有被cancel(true|fasle)取消。 //:不移除不取消任務。 } } } tryTerminate();//嘗試終止線程池 } //默認實現只返回給定的任務。 //而後在調用decorateTask進行包裝,該方法是留給用戶去擴展的,默認是個空方法 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) { return task; } //默認實現只返回給定的任務。 protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) { return task; } private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; } //延期執行,異步 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); //Runnable封裝成ScheduledFutureTask,返回這個ScheduledFutureTask RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t);// return t; } //延期執行,異步 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); //封裝任務ScheduledFutureTask RunnableScheduledFuture<V> t = decorateTask(callable,new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t);//加入隊列開線程 return t; } //週期執行。initialDelay延遲屢次時間開始執行,period每屢次時間執行一次。 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),unit.toNanos(period));//任務封裝成ScheduledFutureTask RunnableScheduledFuture<Void> t = decorateTask(command, sft);//返回t=sft sft.outerTask = t;//outerTask=本身 delayedExecute(t); return t; } //週期執行,異步 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } // Override AbstractExecutorService methods public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); } public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors1.callable(task, result), 0, NANOSECONDS); } public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, NANOSECONDS); } public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { continueExistingPeriodicTasksAfterShutdown = value; //true&&true:shutdown以後不繼續週期任務,ctl>=SHUTDWON:取消隊列中全部的任務 if (!value && isShutdown()) onShutdown(); //fasle:shutdown以後繼續週期任務:不中止隊列的任務 //true&&false:shutdown以後不繼續週期任務,ctl線程池正常狀態:不中止隊列的任務 } public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { return continueExistingPeriodicTasksAfterShutdown; } public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { executeExistingDelayedTasksAfterShutdown = value; //true&&true:shutdown以後不繼續一次性任務,ctl>=SHUTDWON:取消隊列中全部的任務 if (!value && isShutdown()) onShutdown(); //fasle:shutdown以後繼續週期任務:不中止隊列的任務 //true&&false:shutdown以後不繼續週期任務,ctl線程池正常狀態:不中止隊列的任務 } public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { return executeExistingDelayedTasksAfterShutdown; } //取消任務時是否應當即從work queue工做隊列中刪除的策略。該值默認爲假。ScheduledFutureTask的cancel()方法用到。 //FutureTask任務被取消後(FutureTask.cancel()調用),是否是應該從隊列刪除。 public void setRemoveOnCancelPolicy(boolean value) { removeOnCancel = value; } public boolean getRemoveOnCancelPolicy() {//ScheduledFutureTask的cancel()方法用到。 return removeOnCancel; } public void shutdown() {//關閉線程池 super.shutdown(); } public List<Runnable> shutdownNow() { return super.shutdownNow(); } public BlockingQueue<Runnable> getQueue() { return super.getQueue(); } //是一個有序阻塞隊列,經過每一個任務按照距離下次執行時間間隔的大小來排序;線程池的隊列 static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { private static final int INITIAL_CAPACITY = 16; //ScheduledFutureTask數組,是一個數組,堆排序的數組,裏面的任務是ScheduledFutureTask private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock();//任務入隊的鎖 private int size = 0; //當該任務到觸發時間時,會喚醒不少woker線程,這顯然是沒有必要的。 private Thread leader = null;//取隊列任務的worker線程。 private final Condition available = lock.newCondition(); //若是是scheduledfuturetask,則設置f的heapIndex。 private void setIndex(RunnableScheduledFuture<?> f, int idx) { if (f instanceof ScheduledFutureTask) ((ScheduledFutureTask) f).heapIndex = idx; } private void siftUp(int k, RunnableScheduledFuture<?> key) {//key打算放到k位置 while (k > 0) { int parent = (k - 1) >>> 1;//父節點位置 RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0)//key大於父節點不交換,比較的是time, break; //小於等於父節點 queue[k] = e;//父節點放到k的位置 setIndex(e, k);//設置父節點索引爲k k = parent;//準備放到parent位置再比較 } queue[k] = key;//key放到k位置 setIndex(key, k);//設置key的堆索引=k } private void siftDown(int k, RunnableScheduledFuture<?> key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; RunnableScheduledFuture<?> c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); } private void grow() { int oldCapacity = queue.length; int newCapacity = oldCapacity + (oldCapacity >> 1); // 1.5倍oldCapacity if (newCapacity < 0) // 溢出了 newCapacity = Integer.MAX_VALUE;//DelayedWorkQueue堵塞隊列是無限大的,幾乎爲無上限。因此,不存在最大線程數 //這裏沒有向用戶開放maximumPoolSize的設置,緣由是DelayedWorkQueue中的元素在大於初始容量16時,會進行擴容,也就是說隊列不會裝滿,maximumPoolSize參數即便設置了也不會生效。 queue = Arrays.copyOf(queue, newCapacity); } //查找給定對象的索引,若是不存在則爲-1。 private int indexOf(Object x) { if (x != null) { if (x instanceof ScheduledFutureTask) { int i = ((ScheduledFutureTask) x).heapIndex; if (i >= 0 && i < size && queue[i] == x) return i; } else {//沒有heapIndex屬性 for (int i = 0; i < size; i++) if (x.equals(queue[i])) return i; } } return -1; } public boolean contains(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { return indexOf(x) != -1; } finally { lock.unlock(); } } public boolean remove(Object x) { final ReentrantLock lock = this.lock;//其他線程不能操做隊列 lock.lock(); try { int i = indexOf(x); if (i < 0) return false; setIndex(queue[i], -1); int s = --size; RunnableScheduledFuture<?> replacement = queue[s]; queue[s] = null; if (s != i) { siftDown(i, replacement);//數組最後一個元素準備放到i位置,下調整堆 if (queue[i] == replacement)//沒動 siftUp(i, replacement);//再上調整堆 } return true; } finally { lock.unlock(); } } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return size; } finally { lock.unlock(); } } public boolean isEmpty() { return size() == 0; } public int remainingCapacity() { return Integer.MAX_VALUE; } public RunnableScheduledFuture<?> peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return queue[0]; } finally { lock.unlock(); } } public boolean offer(Runnable x) {//ScheduledFutureTask if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size;//隊列中實際個數 if (i >= queue.length)//大於16就擴容 grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0);//設置堆索引 } else {//i>=1 //把任務加入堆中,並調整堆結構,這裏就會根據任務的觸發時間排列 //把須要最先執行的任務放在前面 siftUp(i, e);//準備在數組最後位置插入e,根據time調整堆 } //若是新加入的元素就是隊列頭,這裏有兩種狀況 //1.這是用戶提交的第一個任務 //2.新任務進行堆調整之後,排在隊列頭(原來在最末尾,如今調整到第一個了) if (queue[0] == e) {//添加的任務是第一個 leader = null;// available.signal();//喚醒取隊列的worker線程 } } finally { lock.unlock(); } return true; } //在每次往優先級隊列中添加元素時以元素的過時時間做爲排序條件,最早過時的元素放在優先級最高。 public void put(Runnable e) { offer(e); } public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable e, long timeout, TimeUnit unit) { return offer(e); } private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { int s = --size; RunnableScheduledFuture<?> x = queue[s];//最末尾的元素 queue[s] = null;//最後一個元素拿出來 if (s != 0)//s=0只有一個元素 siftDown(0, x);//最後一個元素拿出來,準備放到0位置,而後調整堆。 setIndex(f, -1);//設置任務的堆索引=-1就是數組的位置, return f; } public RunnableScheduledFuture<?> poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<?> first = queue[0]; if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return finishPoll(first); } finally { lock.unlock(); } } //線程池的Worek線程從隊列中取任務,多線程訪問。能夠根據元素的過時時間來對元素進行排列,所以,先過時的元素會在隊首,每次從隊列裏取出來都是最早要過時的元素。 public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { //列頭的元素是最早「到期」的元素,每次取的是queue[0],若是隊列裏面沒有元素到期,是不能從列頭獲取元素的,哪怕有元素也不行。也就是說只有在延遲期到時纔可以從隊列中取元素。 RunnableScheduledFuture<?> first = queue[0]; if (first == null) //若是隊列爲空,則阻塞等待加入元素時喚醒 //線程池中的worker線程從隊列獲取任務去執行:workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(),沒有任務這個線程池線程就阻塞在這裏。 available.await(); else { long delay = first.getDelay(NANOSECONDS);//判斷還剩下多少時間 if (delay <= 0) return finishPoll(first);//成功得到任務 first = null; // don't retain ref while waiting //這裏表示該任務已經分配給了其餘線程,當前線程等待喚醒就能夠 if (leader != null)//讓leader去取 available.await();//不是leader的線程是死等,而不是等待多長時間,只有leader是等待多長時間。worker取任務線程。 else { //不然把給任務分配給當前線程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { //leader線程在這裏等待,第一個阻塞等待的線程,第一個取的線程。必定要leader線程喚醒後去取,其餘線程才能去取,而後成爲新的leader。 available.awaitNanos(delay);//Worker線程取任務阻塞 } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal();//Worker線程取任務喚醒 lock.unlock(); } } public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); if (nanos <= 0) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = 0; i < size; i++) { RunnableScheduledFuture<?> t = queue[i]; if (t != null) { queue[i] = null; setIndex(t, -1); } } size = 0; } finally { lock.unlock(); } } private RunnableScheduledFuture<?> peekExpired() { // assert lock.isHeldByCurrentThread(); RunnableScheduledFuture<?> first = queue[0]; return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : first; } public int drainTo(Collection<? super Runnable> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<?> first; int n = 0; while ((first = peekExpired()) != null) { c.add(first); // In this order, in case add() throws. finishPoll(first); ++n; } return n; } finally { lock.unlock(); } } public int drainTo(Collection<? super Runnable> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<?> first; int n = 0; while (n < maxElements && (first = peekExpired()) != null) { c.add(first); // In this order, in case add() throws. finishPoll(first); ++n; } return n; } finally { lock.unlock(); } } public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return Arrays.copyOf(queue, size, Object[].class); } finally { lock.unlock(); } } @SuppressWarnings("unchecked") public <T> T[] toArray(T[] a) { final ReentrantLock lock = this.lock; lock.lock(); try { if (a.length < size) return (T[]) Arrays.copyOf(queue, size, a.getClass()); System.arraycopy(queue, 0, a, 0, size); if (a.length > size) a[size] = null; return a; } finally { lock.unlock(); } } public Iterator<Runnable> iterator() { return new Itr(Arrays.copyOf(queue, size)); } private class Itr implements Iterator<Runnable> { final RunnableScheduledFuture<?>[] array; int cursor = 0; int lastRet = -1; Itr(RunnableScheduledFuture<?>[] array) { this.array = array; } public boolean hasNext() { return cursor < array.length; } public Runnable next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return array[cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); DelayedWorkQueue.this.remove(array[lastRet]); lastRet = -1; } } } //是一個能夠把結果放在outcome的FutureTask,經過FutureTask的outcome返回結果。 public class ScheduledFutureTask<V> extends FutureTask1<V> implements RunnableScheduledFuture<V> { //數組的下邊位置,-1表示再也不數組裏面了 int heapIndex; //給ScheduledThreadPoolExecutor池子中添加的任務的序號,從0開始加1, private final long sequenceNumber; private long time;//延遲屢次時間開始執行, //以納秒爲單位的重複任務週期。 //正值表示固定速率執行。 //負值表示固定延遲執行。 //值0表示非重複任務。 private final long period;//每屢次時間執行一次。 //由ReexecutePeriodic從新排隊的實際任務 RunnableScheduledFuture<V> outerTask = this;//ScheduledFutureTask ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result);//設置FutureTask的callable和state。任務是封裝在了ScheduledFutureTask的父類FutureTask中了, this.time = ns;//延遲屢次時間開始執行, this.period = 0;//每0時間執行一次。一次性任務 this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result);//設置FutureTask的callable和state。 this.time = ns;//延遲屢次時間開始執行, this.period = period;//每屢次時間執行一次。 this.sequenceNumber = sequencer.getAndIncrement();//給ScheduledThreadPoolExecutor池子中添加的任務的序號 } ScheduledFutureTask(Callable<V> callable, long ns) { super(callable);//設置FutureTask的callable和state。 this.time = ns; this.period = 0;//每0時間執行一次。一次性任務 this.sequenceNumber = sequencer.getAndIncrement();//給ScheduledThreadPoolExecutor池子中添加的任務的序號,sequencer加1,sequenceNumber仍是加1以前的值。 } public long getDelay(TimeUnit unit) {//距離下次任務執行時間的時間間隔; return unit.convert(time - now(), NANOSECONDS); } //小於0本身小,大於0本身大,等於0相等 public int compareTo(Delayed other) {//用於比較任務之間的優先級關係,若是距離下次執行的時間間隔較短,則優先級高; if (other == this) // 同一個對象返回0 return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other; long diff = time - x.time; if (diff < 0)//本身小 return -1; else if (diff > 0)//本身大 return 1; //時間相等,看sequenceNumber序列號 else if (sequenceNumber < x.sequenceNumber) return -1;//本身小 else return 1;//本身大 } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } //這是一個週期性的(不是一次性的)動做。period=0一次性。period public boolean isPeriodic() { return period != 0; } //設置下次運行按期任務的時間。每5秒執行一次,period=5,下一次執行時間=time+5 //scheduleAtFixedRate會執行到狀況一,下一次任務的啓動時間最先爲上一次任務的啓動時間加period。 //scheduleWithFixedDelay會執行到狀況二,這裏很巧妙的將period參數設置爲負數到達這段代碼塊,在此又將負的period轉爲正數。狀況二將下一次任務的啓動時間設置爲當前時間加period。 private void setNextRunTime() { long p = period; if (p > 0) time += p; //scheduleAtFixedRate方法提交任務時,任務後續執行的延遲時間都已經肯定好了,分別是initialDelay,initialDelay + period,initialDelay + 2 * period以此類推。 else //scheduleWithFixedDelay方法提交任務時,第一次執行的延遲時間爲initialDelay,後面的每次執行時間都是在前一次任務執行完成之後的時間點上面加上period延遲執行。 time = triggerTime(-p); } //1.添加任務到隊列以後,線程池處於關閉狀態,移除任務成功後,就去取消任務。 public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = super.cancel(mayInterruptIfRunning); if (cancelled && removeOnCancel && heapIndex >= 0)//heapIndex >= 0這個任務還在數組裏面 remove(this); return cancelled; } //線程池裏面task.run()就是ScheduledFutureTask.run,轉而super就是FutureTask.run,就是真正Runnable.run public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic)//不是週期的 ScheduledFutureTask.super.run();//真正Runnable.run else if (ScheduledFutureTask.super.runAndReset()) {//是週期的,runAndReset沒有設置返回值 setNextRunTime(); reExecutePeriodic(outerTask);//outerTask是任務本身。又把任務加到隊列,若是線程池中worker線程數<corepooolsize,就又開一個線程池線程。注意:這個新開的線程池線程,是調用task.run()就是ScheduledFutureTask.run的線程池線程開的。 } } } }
@SuppressWarnings({ "rawtypes", "unchecked" }) public static void main(String[] args) throws Exception { // 建立corepoolSize=2的線程池 ScheduledExecutorService scheduledThreadPool = Executors1.newScheduledThreadPool(2); new Thread(new Runnable() { @Override public void run() { scheduledThreadPool.shutdown(); } }).start(); FutureTask1 t1 = (ScheduledFutureTask) scheduledThreadPool.schedule( new Callable<String>() { @Override public String call() throws Exception { return "Callable1" ; } }, 300, TimeUnit.SECONDS);//返回將Callable封裝的FutureTask t1.get();//從FutureTask獲取結果 t1.cancel(true);//從FutureTask取消任務 // 10秒後執行,週期性執行,每5秒執行一次 FutureTask1 t3 = (FutureTask1) scheduledThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("Runnable"); } }, 10, 5, TimeUnit.SECONDS); t3.get();//獲取不到結果,一直卡住 // 10秒後執行,一次性任務 FutureTask1 t4 = (FutureTask1) scheduledThreadPool.scheduleWithFixedDelay(new Runnable() { @Override public void run() { System.out.println("Runnable"); } }, 10, 5, TimeUnit.SECONDS); t4.get(); scheduledThreadPool.execute(new Runnable() { @Override public void run() { System.out.println("Runnable"); } }); scheduledThreadPool.submit(new Runnable() { @Override public void run() { System.out.println("Runnable"); } }); scheduledThreadPool.submit(new Runnable() { @Override public void run() { System.out.println("Runnable"); } }, "指望值"); scheduledThreadPool.submit(new Callable<String>() { @Override public String call() throws Exception { return "Callable"; } }); } }