目錄html
正文java
咱們知道線程池運行時,會不斷從任務隊列中獲取任務,而後執行任務。若是咱們想實現延時或者定時執行任務,重要一點就是任務隊列會根據任務延時時間的不一樣進行排序,延時時間越短地就排在隊列的前面,先被獲取執行。數組
隊列是先進先出的數據結構,就是先進入隊列的數據,先被獲取。可是有一種特殊的隊列叫作優先級隊列,它會對插入的數據進行優先級排序,保證優先級越高的數據首先被獲取,與數據的插入順序無關。安全
實現優先級隊列高效經常使用的一種方式就是使用堆。數據結構
回到頂部多線程
堆一般是一個能夠被看作一棵樹的數組對象。併發
堆(heap)又被爲優先隊列(priority queue)。儘管名爲優先隊列,但堆並非隊列。動畫
由於隊列中容許的操做是先進先出(FIFO),在隊尾插入元素,在隊頭取出元素。this
而堆雖然在堆底插入元素,在堆頂取出元素,可是堆中元素的排列不是按照到來的前後順序,而是按照必定的優先順序排列的。spa
這裏來講明一下滿二叉樹的概念與徹底二叉樹的概念。
除了葉子節點,全部的節點的左右孩子都不爲空,就是一棵滿二叉樹,以下圖。
能夠看出:滿二叉樹全部的節點都擁有左孩子,又擁有右孩子。
不必定是一個滿二叉樹,但它不滿的那部分必定在右下側,以下圖
堆老是知足下列性質:
堆中某個節點的值老是不大於或不小於其父節點的值;
堆老是一棵徹底二叉樹。
堆是一個二叉樹,可是它最簡單的方式是經過數組去實現二叉樹,並且由於堆是一個徹底二叉樹,就不存在數組空間的浪費。怎麼使用數組來存儲二叉樹呢?
就是用數組的下標來模擬二叉樹的各個節點,好比說根節點就是0,第一層的左節點是1,右節點是2。由此咱們能夠得出下列公式:
1 // 對於n位置的節點來講: 2 int left = 2 * n + 1; // 左子節點 3 int right = 2 * n + 2; // 右子節點 4 int parent = (n - 1) / 2; // 父節點,固然n要大於0,根節點是沒有父節點的
對於堆來講,只有兩個操做,插入insert和刪除remove,無論插入仍是刪除保證堆的成立條件,1.是徹底二叉樹,2.父節點的值不能小於子節點的值。
1 public void insert(int value) { 2 // 第一步將插入的值,直接放在最後一個位置。並將長度加一 3 store[size++] = value; 4 // 獲得新插入值所在位置。 5 int index = size - 1; 6 while(index > 0) { 7 // 它的父節點位置座標 8 int parentIndex = (index - 1) / 2; 9 // 若是父節點的值小於子節點的值,你不知足堆的條件,那麼就交換值 10 if (store[index] > store[parentIndex]) { 11 swap(store, index, parentIndex); 12 index = parentIndex; 13 } else { 14 // 不然表示這條路徑上的值已經知足降序,跳出循環 15 break; 16 } 17 } 18 }
主要步驟:
直接將value插入到size位置,並將size自增,這樣store數組中插入一個值了。
要保證從這個葉節點到根節點這條路徑上的節點,知足父節點的值不能小於子節點。
經過int parentIndex = (index - 1) / 2獲得父節點,若是比父節點值大,那麼二者位置的值交換,而後再拿這個父節點和它的父父節點比較。
直到這個節點值比父節點值小,或者這個節點已是根節點就退出循環。
由於每次循環index都是除以2這種倍數遞減的方式,因此它最多循環次數是(log N)次。
1 public int remove() { 2 // 將根的值記錄,最後返回 3 int result = store[0]; 4 // 將最後位置的值放到根節點位置 5 store[0] = store[--size]; 6 int index = 0; 7 // 經過循環,保證父節點的值不能小於子節點。 8 while(true) { 9 int leftIndex = 2 * index + 1; // 左子節點 10 int rightIndex = 2 * index + 2; // 右子節點 11 // leftIndex >= size 表示這個子節點尚未值。 12 if (leftIndex >= size) break; 13 int maxIndex = leftIndex; 14 //找到左右節點中較大的一個節點 15 if (store[leftIndex] < store[rightIndex]) maxIndex = rightIndex; 16 //與子節點中較大的子節點比較,若是子節點更大,則交換位置 17 //爲何要與較大的子節點比較呢?若是和較小的節點比較,沒有交換位置,但有可能比較大的節點小 18 if (store[index] < store[maxIndex]) { 19 swap(store, index, maxIndex); 20 index = maxIndex; 21 } else { 22 //知足子節點比當前節點小,退出循環 23 break; 24 } 25 } 26 //返回最開始的第一個值 27 return result; 28 }
在堆中最大值就在根節點,因此操做步驟:
將根節點的值保存到result中。
將最後節點的值移動到根節點,再將長度減一,這樣知足堆成立第一個條件,堆是一個徹底二叉樹。
使用循環,來知足堆成立的第二個條件,父節點的值不能小於子節點的值。
最後返回result。
每次循環咱們都是以2的倍數遞增,因此它也是最多循環次數是(log N)次。
因此經過堆這種方式能夠快速實現優先級隊列,它的插入和刪除操做的效率都是O(log N)。
那麼怎麼實現堆排序?這個很簡單,利用優先隊列的特性:
1 private static void headSort(int[] arr) { 2 int size = arr.length; 3 Head head = new Head(size); 4 for (int i = 0; i < size; i++) { 5 head.insert(arr[i]); 6 } 7 for (int i = 0; i < size; i++) { 8 // 實現從大到小的排序 9 arr[size - 1 - i] = head.remove(); 10 } 11 }
堆排序的效率:由於每次插入數據效率是O(log N),而咱們須要進行n次循環,將數組中每一個值插入到堆中,因此它的執行時間是O(N * log N)級。
1 static class DelayedWorkQueue extends AbstractQueue<Runnable> 2 implements BlockingQueue<Runnable> {
從定義中看出DelayedWorkQueue是一個阻塞隊列。而且DelayedWorkQueue是一個最小堆,最頂點的值最小,即堆中某個節點的值老是不小於其父節點的值。
1 // 初始時,數組長度大小。 2 private static final int INITIAL_CAPACITY = 16; 3 // 使用數組來儲存隊列中的元素。 4 private RunnableScheduledFuture<?>[] queue = 5 new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; 6 // 使用lock來保證多線程併發安全問題。 7 private final ReentrantLock lock = new ReentrantLock(); 8 // 隊列中儲存元素的大小 9 private int size = 0; 10 11 //特指隊列頭任務所在線程 12 private Thread leader = null; 13 14 // 當隊列頭的任務延時時間到了,或者有新的任務變成隊列頭時,用來喚醒等待線程 15 private final Condition available = lock.newCondition();
DelayedWorkQueue是用數組來儲存隊列中的元素,那麼咱們看看它是怎麼實現優先級隊列的。
1 public void put(Runnable e) { 2 offer(e); 3 } 4 5 public boolean add(Runnable e) { 6 return offer(e); 7 } 8 9 public boolean offer(Runnable e, long timeout, TimeUnit unit) { 10 return offer(e); 11 }
咱們發現與普通阻塞隊列相比,這三個添加方法都是調用offer方法。那是由於它沒有隊列已滿的條件,也就是說能夠不斷地向DelayedWorkQueue添加元素,當元素個數超過數組長度時,會進行數組擴容。
1 public boolean offer(Runnable x) { 2 if (x == null) 3 throw new NullPointerException(); 4 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; 5 // 使用lock保證併發操做安全 6 final ReentrantLock lock = this.lock; 7 lock.lock(); 8 try { 9 int i = size; 10 // 若是要超過數組長度,就要進行數組擴容 11 if (i >= queue.length) 12 // 數組擴容 13 grow(); 14 // 將隊列中元素個數加一 15 size = i + 1; 16 // 若是是第一個元素,那麼就不須要排序,直接賦值就好了 17 if (i == 0) { 18 queue[0] = e; 19 setIndex(e, 0); 20 } else { 21 // 調用siftUp方法,使插入的元素變得有序。 22 siftUp(i, e); 23 } 24 // 表示新插入的元素是隊列頭,更換了隊列頭, 25 // 那麼就要喚醒正在等待獲取任務的線程。 26 if (queue[0] == e) { 27 leader = null; 28 // 喚醒正在等待等待獲取任務的線程 29 available.signal(); 30 } 31 } finally { 32 lock.unlock(); 33 } 34 return true; 35 }
數組擴容方法:
1 private void grow() { 2 int oldCapacity = queue.length; 3 // 每次擴容增長原來數組的一半數量。 4 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 5 if (newCapacity < 0) // overflow 6 newCapacity = Integer.MAX_VALUE; 7 // 使用Arrays.copyOf來複制一個新數組 8 queue = Arrays.copyOf(queue, newCapacity); 9 }
插入元素排序siftUp方法:
1 private void siftUp(int k, RunnableScheduledFuture<?> key) { 2 // 當k==0時,就到了堆二叉樹的根節點了,跳出循環 3 while (k > 0) { 4 // 父節點位置座標, 至關於(k - 1) / 2 5 int parent = (k - 1) >>> 1; 6 // 獲取父節點位置元素 7 RunnableScheduledFuture<?> e = queue[parent]; 8 // 若是key元素大於父節點位置元素,知足條件,那麼跳出循環 9 // 由於是從小到大排序的。 10 if (key.compareTo(e) >= 0) 11 break; 12 // 不然就將父節點元素存放到k位置 13 queue[k] = e; 14 // 這個只有當元素是ScheduledFutureTask對象實例纔有用,用來快速取消任務。 15 setIndex(e, k); 16 // 從新賦值k,尋找元素key應該插入到堆二叉樹的那個節點 17 k = parent; 18 } 19 // 循環結束,k就是元素key應該插入的節點位置 20 queue[k] = key; 21 setIndex(key, k); 22 }
主要是三步:
咱們來看看動畫
假設現有元素 5 須要插入,爲了維持徹底二叉樹的特性,新插入的元素必定是放在結點 6 的右子樹;同時爲了知足任一結點的值要小於左右子樹的值這一特性,新插入的元素要和其父結點做比較,若是比父結點小,就要把父結點拉下來頂替當前結點的位置,本身則依次不斷向上尋找,找到比本身大的父結點就拉下來,直到沒有符合條件的值爲止。
動畫講解:
在這裏先將元素 5 插入到末尾,即放在結點 6 的右子樹。
而後與父類比較, 6 > 5 ,父類數字大於子類數字,子類與父類交換。
重複此操做,直到不發生替換。
1 public RunnableScheduledFuture<?> poll() { 2 final ReentrantLock lock = this.lock; 3 lock.lock(); 4 try { 5 RunnableScheduledFuture<?> first = queue[0]; 6 // 隊列頭任務是null,或者任務延時時間沒有到,都返回null 7 if (first == null || first.getDelay(NANOSECONDS) > 0) 8 return null; 9 else 10 // 移除隊列頭元素 11 return finishPoll(first); 12 } finally { 13 lock.unlock(); 14 } 15 }
1 public long getDelay(TimeUnit unit) { 2 return unit.convert(time - now(), NANOSECONDS); 3 }
當隊列頭任務是null,或者任務延時時間沒有到,表示這個任務還不能返回,所以直接返回null。不然調用finishPoll方法,移除隊列頭元素並返回。
1 // 移除隊列頭元素 2 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { 3 // 將隊列中元素個數減一 4 int s = --size; 5 // 獲取隊列末尾元素x 6 RunnableScheduledFuture<?> x = queue[s]; 7 // 原隊列末尾元素設置爲null 8 queue[s] = null; 9 if (s != 0) 10 // 將隊列最後一個元素移動到對列頭元素位置,而後向下排序 11 // 由於移除了隊列頭元素,因此進行從新排序。 12 siftDown(0, x); 13 setIndex(f, -1); 14 return f; 15 }
這個方法與咱們在第一節中,介紹堆的刪除方法同樣。
- 先將隊列中元素個數減一。
- 將原隊列末尾元素設置成隊列頭元素,再將隊列末尾元素設置爲null。
- 調用siftDown(0, x)方法,保證按照元素的優先級排序。
移除元素排序siftDown方法:
1 private void siftDown(int k, RunnableScheduledFuture<?> key) { 2 int half = size >>> 1; 3 // 經過循環,保證父節點的值不能大於子節點。 4 while (k < half) { 5 // 左子節點, 至關於 (k * 2) + 1 6 int child = (k << 1) + 1; 7 // 左子節點位置元素 8 RunnableScheduledFuture<?> c = queue[child]; 9 // 右子節點, 至關於 (k * 2) + 2 10 int right = child + 1; 11 // 若是左子節點元素值大於右子節點元素值,那麼右子節點纔是較小值的子節點。 12 // 就要將c與child值從新賦值 13 if (right < size && c.compareTo(queue[right]) > 0) 14 c = queue[child = right]; 15 // 若是父節點元素值小於較小的子節點元素值,那麼就跳出循環 16 if (key.compareTo(c) <= 0) 17 break; 18 // 不然,父節點元素就要和子節點進行交換 19 queue[k] = c; 20 setIndex(c, k); 21 k = child; 22 } 23 // 循環結束,k就是元素key應該插入的節點位置 24 queue[k] = key; 25 setIndex(key, k); 26 }
咱們來看看動畫
核心點:將最後一個元素填充到堆頂,而後不斷的下沉這個元素。
假設要從節點 1 ,也能夠稱爲取出節點 1 ,爲了維持徹底二叉樹的特性 ,咱們將最後一個元素 6 去替代這個 1 ;而後比較 1 和其子樹的大小關係,若是比左右子樹大(若是存在的話),就要從左右子樹中找一個較小的值替換它,而它能本身就要跑到對應子樹的位置,再次循環這種操做,直到沒有子樹比它小。
經過這樣的操做,堆依然是堆,總結一下:
1 public RunnableScheduledFuture<?> take() throws InterruptedException { 2 final ReentrantLock lock = this.lock; 3 lock.lockInterruptibly(); 4 try { 5 for (;;) { 6 RunnableScheduledFuture<?> first = queue[0]; 7 // 若是沒有任務,就讓線程在available條件下等待。 8 if (first == null) 9 available.await(); 10 else { 11 // 獲取任務的剩餘延時時間 12 long delay = first.getDelay(NANOSECONDS); 13 // 若是延時時間到了,就返回這個任務,用來執行。 14 if (delay <= 0) 15 return finishPoll(first); 16 // 將first設置爲null,當線程等待時,不持有first的引用 17 first = null; // don't retain ref while waiting 18 19 // 若是仍是原來那個等待隊列頭任務的線程, 20 // 說明隊列頭任務的延時時間尚未到,繼續等待。 21 if (leader != null) 22 available.await(); 23 else { 24 // 記錄一下當前等待隊列頭任務的線程 25 Thread thisThread = Thread.currentThread(); 26 leader = thisThread; 27 try { 28 // 當任務的延時時間到了時,可以自動超時喚醒。 29 available.awaitNanos(delay); 30 } finally { 31 if (leader == thisThread) 32 leader = null; 33 } 34 } 35 } 36 } 37 } finally { 38 if (leader == null && queue[0] != null) 39 // 喚醒等待任務的線程 40 available.signal(); 41 lock.unlock(); 42 } 43 }
若是隊列中沒有任務,那麼就讓當前線程在available條件下等待。若是隊列頭任務的剩餘延時時間delay大於0,那麼就讓當前線程在available條件下等待delay時間。
1 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) 2 throws InterruptedException { 3 long nanos = unit.toNanos(timeout); 4 final ReentrantLock lock = this.lock; 5 lock.lockInterruptibly(); 6 try { 7 for (;;) { 8 RunnableScheduledFuture<?> first = queue[0]; 9 // 若是沒有任務。 10 if (first == null) { 11 // 超時時間已到,那麼就直接返回null 12 if (nanos <= 0) 13 return null; 14 else 15 // 不然就讓線程在available條件下等待nanos時間 16 nanos = available.awaitNanos(nanos); 17 } else { 18 // 獲取任務的剩餘延時時間 19 long delay = first.getDelay(NANOSECONDS); 20 // 若是延時時間到了,就返回這個任務,用來執行。 21 if (delay <= 0) 22 return finishPoll(first); 23 // 若是超時時間已到,那麼就直接返回null 24 if (nanos <= 0) 25 return null; 26 // 將first設置爲null,當線程等待時,不持有first的引用 27 first = null; // don't retain ref while waiting 28 // 若是超時時間小於任務的剩餘延時時間,那麼就有可能獲取不到任務。 29 // 在這裏讓線程等待超時時間nanos 30 if (nanos < delay || leader != null) 31 nanos = available.awaitNanos(nanos); 32 else { 33 Thread thisThread = Thread.currentThread(); 34 leader = thisThread; 35 try { 36 // 當任務的延時時間到了時,可以自動超時喚醒。 37 long timeLeft = available.awaitNanos(delay); 38 // 計算剩餘的超時時間 39 nanos -= delay - timeLeft; 40 } finally { 41 if (leader == thisThread) 42 leader = null; 43 } 44 } 45 } 46 } 47 } finally { 48 if (leader == null && queue[0] != null) 49 // 喚醒等待任務的線程 50 available.signal(); 51 lock.unlock(); 52 } 53 }
與take方法相比較,就要考慮設置的超時時間,若是超時時間到了,尚未獲取到有用任務,那麼就返回null。其餘的與take方法中邏輯同樣。
http://www.javashuo.com/article/p-wpcnychj-ek.html
使用優先級隊列DelayedWorkQueue,保證添加到隊列中的任務,會按照任務的延時時間進行排序,延時時間少的任務首先被獲取。