併發編程(十四)—— ScheduledThreadPoolExecutor 實現原理與源碼深度解析 之DelayWorkQueue

目錄html

 

正文java

咱們知道線程池運行時,會不斷從任務隊列中獲取任務,而後執行任務。若是咱們想實現延時或者定時執行任務,重要一點就是任務隊列會根據任務延時時間的不一樣進行排序,延時時間越短地就排在隊列的前面,先被獲取執行。數組

隊列是先進先出的數據結構,就是先進入隊列的數據,先被獲取。可是有一種特殊的隊列叫作優先級隊列,它會對插入的數據進行優先級排序,保證優先級越高的數據首先被獲取,與數據的插入順序無關。安全

實現優先級隊列高效經常使用的一種方式就是使用堆。數據結構

回到頂部多線程

什麼是堆?

堆一般是一個能夠被看作一棵樹的數組對象。併發

堆(heap)又被爲優先隊列(priority queue)。儘管名爲優先隊列,但堆並非隊列。動畫

由於隊列中容許的操做是先進先出(FIFO),在隊尾插入元素,在隊頭取出元素。this

而堆雖然在堆底插入元素,在堆頂取出元素,可是堆中元素的排列不是按照到來的前後順序,而是按照必定的優先順序排列的。spa

 

這裏來講明一下滿二叉樹的概念與徹底二叉樹的概念。

滿二叉樹

  除了葉子節點,全部的節點的左右孩子都不爲空,就是一棵滿二叉樹,以下圖。

能夠看出:滿二叉樹全部的節點都擁有左孩子,又擁有右孩子。

徹底二叉樹

  不必定是一個滿二叉樹,但它不滿的那部分必定在右下側,以下圖

 

堆老是知足下列性質:

  • 堆中某個節點的值老是不大於或不小於其父節點的值;

  • 堆老是一棵徹底二叉樹。

 

  • 最大值時,稱爲「最大堆」,也稱大頂堆;
  • 最小值時,稱爲「最小堆」,也稱小頂堆。

回到頂部

堆的實現

堆是一個二叉樹,可是它最簡單的方式是經過數組去實現二叉樹,並且由於堆是一個徹底二叉樹,就不存在數組空間的浪費。怎麼使用數組來存儲二叉樹呢?

就是用數組的下標來模擬二叉樹的各個節點,好比說根節點就是0,第一層的左節點是1,右節點是2。由此咱們能夠得出下列公式:

1 // 對於n位置的節點來講:
2 int left = 2 * n + 1; // 左子節點
3 int right = 2 * n + 2; // 右子節點
4 int parent = (n - 1) / 2; // 父節點,固然n要大於0,根節點是沒有父節點的

對於堆來講,只有兩個操做,插入insert和刪除remove,無論插入仍是刪除保證堆的成立條件,1.是徹底二叉樹,2.父節點的值不能小於子節點的值。

最大堆的插入(ADD)

複製代碼

1 public void insert(int value) {
 2      // 第一步將插入的值,直接放在最後一個位置。並將長度加一
 3      store[size++] = value;
 4      // 獲得新插入值所在位置。
 5      int index = size - 1;
 6      while(index > 0) {
 7          // 它的父節點位置座標
 8          int parentIndex = (index - 1) / 2;
 9          // 若是父節點的值小於子節點的值,你不知足堆的條件,那麼就交換值
10          if (store[index] > store[parentIndex]) {
11              swap(store, index, parentIndex);
12              index = parentIndex;
13          } else {
14              // 不然表示這條路徑上的值已經知足降序,跳出循環
15              break;
16          }
17      }
18 }

複製代碼

主要步驟:

  • 直接將value插入到size位置,並將size自增,這樣store數組中插入一個值了。

  • 要保證從這個葉節點到根節點這條路徑上的節點,知足父節點的值不能小於子節點。

  • 經過int parentIndex = (index - 1) / 2獲得父節點,若是比父節點值大,那麼二者位置的值交換,而後再拿這個父節點和它的父父節點比較。

    直到這個節點值比父節點值小,或者這個節點已是根節點就退出循環。

由於每次循環index都是除以2這種倍數遞減的方式,因此它最多循環次數是(log N)次。

最大堆的刪除(DELETE)

複製代碼

1 public int remove() {
 2       // 將根的值記錄,最後返回
 3       int result = store[0];
 4       // 將最後位置的值放到根節點位置
 5       store[0] = store[--size];
 6       int index = 0;
 7       // 經過循環,保證父節點的值不能小於子節點。
 8       while(true) {
 9           int leftIndex = 2 * index + 1; // 左子節點
10           int rightIndex = 2 * index + 2; // 右子節點
11           // leftIndex >= size 表示這個子節點尚未值。
12           if (leftIndex >= size) break;
13           int maxIndex = leftIndex;
14           //找到左右節點中較大的一個節點
15           if (store[leftIndex] < store[rightIndex]) maxIndex = rightIndex;
16           //與子節點中較大的子節點比較,若是子節點更大,則交換位置
17           //爲何要與較大的子節點比較呢?若是和較小的節點比較,沒有交換位置,但有可能比較大的節點小
18           if (store[index] < store[maxIndex]) {
19               swap(store, index, maxIndex);
20               index = maxIndex;
21           } else {
22               //知足子節點比當前節點小,退出循環
23               break;
24           }
25       }
26       //返回最開始的第一個值
27       return result;
28 }

複製代碼

在堆中最大值就在根節點,因此操做步驟:

  1. 將根節點的值保存到result中。

  2. 將最後節點的值移動到根節點,再將長度減一,這樣知足堆成立第一個條件,堆是一個徹底二叉樹。

  3. 使用循環,來知足堆成立的第二個條件,父節點的值不能小於子節點的值。

  4. 最後返回result。

每次循環咱們都是以2的倍數遞增,因此它也是最多循環次數是(log N)次。

因此經過堆這種方式能夠快速實現優先級隊列,它的插入和刪除操做的效率都是O(log N)。


那麼怎麼實現堆排序?這個很簡單,利用優先隊列的特性:

  1. 先遍歷數組。將數組中的值依次插入到堆中。
  2. 而後再用一個循環將值從堆中取出來。

複製代碼

1 private static void headSort(int[] arr) {
 2       int size = arr.length;
 3       Head head = new Head(size);
 4       for (int i = 0; i < size; i++) {
 5           head.insert(arr[i]);
 6       }
 7       for (int i = 0; i < size; i++) {
 8           //  實現從大到小的排序
 9           arr[size - 1 - i] = head.remove();
10       }
11 }

複製代碼

堆排序的效率:由於每次插入數據效率是O(log N),而咱們須要進行n次循環,將數組中每一個值插入到堆中,因此它的執行時間是O(N * log N)級。

回到頂部

DelayedWorkQueue類

1 static class DelayedWorkQueue extends AbstractQueue<Runnable>
2         implements BlockingQueue<Runnable> {

從定義中看出DelayedWorkQueue是一個阻塞隊列。而且DelayedWorkQueue是一個最小堆,最頂點的值最小,即堆中某個節點的值老是不小於其父節點的值。

屬性

複製代碼

1 // 初始時,數組長度大小。
 2 private static final int INITIAL_CAPACITY = 16;
 3 // 使用數組來儲存隊列中的元素。
 4 private RunnableScheduledFuture<?>[] queue =
 5     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
 6 // 使用lock來保證多線程併發安全問題。
 7 private final ReentrantLock lock = new ReentrantLock();
 8 // 隊列中儲存元素的大小
 9 private int size = 0;
10 
11 //特指隊列頭任務所在線程
12 private Thread leader = null;
13 
14 // 當隊列頭的任務延時時間到了,或者有新的任務變成隊列頭時,用來喚醒等待線程
15 private final Condition available = lock.newCondition();

複製代碼

DelayedWorkQueue是用數組來儲存隊列中的元素,那麼咱們看看它是怎麼實現優先級隊列的。

插入元素方法

複製代碼

1 public void put(Runnable e) {
 2     offer(e);
 3 }
 4 
 5 public boolean add(Runnable e) {
 6     return offer(e);
 7 }
 8 
 9 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
10     return offer(e);
11 }

複製代碼

咱們發現與普通阻塞隊列相比,這三個添加方法都是調用offer方法。那是由於它沒有隊列已滿的條件,也就是說能夠不斷地向DelayedWorkQueue添加元素,當元素個數超過數組長度時,會進行數組擴容。

複製代碼

1 public boolean offer(Runnable x) {
 2     if (x == null)
 3         throw new NullPointerException();
 4     RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
 5     // 使用lock保證併發操做安全
 6     final ReentrantLock lock = this.lock;
 7     lock.lock();
 8     try {
 9         int i = size;
10         // 若是要超過數組長度,就要進行數組擴容
11         if (i >= queue.length)
12             // 數組擴容
13             grow();
14         // 將隊列中元素個數加一
15         size = i + 1;
16         // 若是是第一個元素,那麼就不須要排序,直接賦值就好了
17         if (i == 0) {
18             queue[0] = e;
19             setIndex(e, 0);
20         } else {
21             // 調用siftUp方法,使插入的元素變得有序。
22             siftUp(i, e);
23         }
24         // 表示新插入的元素是隊列頭,更換了隊列頭,
25         // 那麼就要喚醒正在等待獲取任務的線程。
26         if (queue[0] == e) {
27             leader = null;
28             // 喚醒正在等待等待獲取任務的線程
29             available.signal();
30         }
31     } finally {
32         lock.unlock();
33     }
34     return true;
35 }

複製代碼

數組擴容方法:

複製代碼

1 private void grow() {
2     int oldCapacity = queue.length;
3     // 每次擴容增長原來數組的一半數量。
4     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
5     if (newCapacity < 0) // overflow
6         newCapacity = Integer.MAX_VALUE;
7     // 使用Arrays.copyOf來複制一個新數組
8     queue = Arrays.copyOf(queue, newCapacity);
9 }

複製代碼

插入元素排序siftUp方法:

複製代碼

1 private void siftUp(int k, RunnableScheduledFuture<?> key) {
 2     // 當k==0時,就到了堆二叉樹的根節點了,跳出循環
 3     while (k > 0) {
 4         // 父節點位置座標, 至關於(k - 1) / 2
 5         int parent = (k - 1) >>> 1;
 6         // 獲取父節點位置元素
 7         RunnableScheduledFuture<?> e = queue[parent];
 8         // 若是key元素大於父節點位置元素,知足條件,那麼跳出循環
 9         // 由於是從小到大排序的。
10         if (key.compareTo(e) >= 0)
11             break;
12         // 不然就將父節點元素存放到k位置
13         queue[k] = e;
14         // 這個只有當元素是ScheduledFutureTask對象實例纔有用,用來快速取消任務。
15         setIndex(e, k);
16         // 從新賦值k,尋找元素key應該插入到堆二叉樹的那個節點
17         k = parent;
18     }
19     // 循環結束,k就是元素key應該插入的節點位置
20     queue[k] = key;
21     setIndex(key, k);
22 }

複製代碼

主要是三步:

  • 元素個數超過數組長度,就會調用grow()方法,進行數組擴容。
  • 將新元素e添加到優先級隊列中對應的位置,經過siftUp方法,保證按照元素的優先級排序。
  • 若是新插入的元素是隊列頭,即更換了隊列頭,那麼就要喚醒正在等待獲取任務的線程。這些線程多是由於原隊列頭元素的延時時間沒到,而等待的。

咱們來看看動畫

假設現有元素 5 須要插入,爲了維持徹底二叉樹的特性,新插入的元素必定是放在結點 6 的右子樹;同時爲了知足任一結點的值要小於左右子樹的值這一特性,新插入的元素要和其父結點做比較,若是比父結點小,就要把父結點拉下來頂替當前結點的位置,本身則依次不斷向上尋找,找到比本身大的父結點就拉下來,直到沒有符合條件的值爲止。

動畫講解:

  1. 在這裏先將元素 5 插入到末尾,即放在結點 6 的右子樹。

  2. 而後與父類比較, 6 > 5 ,父類數字大於子類數字,子類與父類交換。

  3. 重複此操做,直到不發生替換。

當即獲取隊列頭元素

複製代碼

1 public RunnableScheduledFuture<?> poll() {
 2     final ReentrantLock lock = this.lock;
 3     lock.lock();
 4     try {
 5         RunnableScheduledFuture<?> first = queue[0];
 6         // 隊列頭任務是null,或者任務延時時間沒有到,都返回null
 7         if (first == null || first.getDelay(NANOSECONDS) > 0)
 8             return null;
 9         else
10             // 移除隊列頭元素
11             return finishPoll(first);
12     } finally {
13         lock.unlock();
14     }
15 }

複製代碼

1 public long getDelay(TimeUnit unit) {
2     return unit.convert(time - now(), NANOSECONDS);
3 }

當隊列頭任務是null,或者任務延時時間沒有到,表示這個任務還不能返回,所以直接返回null。不然調用finishPoll方法,移除隊列頭元素並返回。

複製代碼

1 // 移除隊列頭元素
 2 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
 3     // 將隊列中元素個數減一
 4     int s = --size;
 5     // 獲取隊列末尾元素x
 6     RunnableScheduledFuture<?> x = queue[s];
 7     // 原隊列末尾元素設置爲null
 8     queue[s] = null;
 9     if (s != 0)
10         // 將隊列最後一個元素移動到對列頭元素位置,而後向下排序
11         // 由於移除了隊列頭元素,因此進行從新排序。
12         siftDown(0, x);
13     setIndex(f, -1);
14     return f;
15 }

複製代碼

這個方法與咱們在第一節中,介紹堆的刪除方法同樣。

  1. 先將隊列中元素個數減一。
  2. 將原隊列末尾元素設置成隊列頭元素,再將隊列末尾元素設置爲null。
  3. 調用siftDown(0, x)方法,保證按照元素的優先級排序。

移除元素排序siftDown方法:

複製代碼

1 private void siftDown(int k, RunnableScheduledFuture<?> key) {
 2     int half = size >>> 1;
 3     // 經過循環,保證父節點的值不能大於子節點。
 4     while (k < half) {
 5         // 左子節點, 至關於 (k * 2) + 1
 6         int child = (k << 1) + 1;
 7         // 左子節點位置元素
 8         RunnableScheduledFuture<?> c = queue[child];
 9         // 右子節點, 至關於 (k * 2) + 2
10         int right = child + 1;
11         // 若是左子節點元素值大於右子節點元素值,那麼右子節點纔是較小值的子節點。
12         // 就要將c與child值從新賦值
13         if (right < size && c.compareTo(queue[right]) > 0)
14             c = queue[child = right];
15         // 若是父節點元素值小於較小的子節點元素值,那麼就跳出循環
16         if (key.compareTo(c) <= 0)
17             break;
18         // 不然,父節點元素就要和子節點進行交換
19         queue[k] = c;
20         setIndex(c, k);
21         k = child;
22     }
23     // 循環結束,k就是元素key應該插入的節點位置
24     queue[k] = key;
25     setIndex(key, k);
26 }

複製代碼

咱們來看看動畫

 

核心點:將最後一個元素填充到堆頂,而後不斷的下沉這個元素。

假設要從節點 1 ,也能夠稱爲取出節點 1 ,爲了維持徹底二叉樹的特性 ,咱們將最後一個元素 6 去替代這個 1 ;而後比較 1 和其子樹的大小關係,若是比左右子樹大(若是存在的話),就要從左右子樹中找一個較小的值替換它,而它能本身就要跑到對應子樹的位置,再次循環這種操做,直到沒有子樹比它小。

經過這樣的操做,堆依然是堆,總結一下:

  • 找到要刪除的節點(取出的節點)在數組中的位置
  • 用數組中最後一個元素替代這個位置的元素
  • 當前位置和其左右子樹比較,保證符合最小堆的節點間規則
  • 刪除最後一個元素

等待獲取隊列頭元素

複製代碼

1 public RunnableScheduledFuture<?> take() throws InterruptedException {
 2     final ReentrantLock lock = this.lock;
 3     lock.lockInterruptibly();
 4     try {
 5         for (;;) {
 6             RunnableScheduledFuture<?> first = queue[0];
 7             // 若是沒有任務,就讓線程在available條件下等待。
 8             if (first == null)
 9                 available.await();
10             else {
11                 // 獲取任務的剩餘延時時間
12                 long delay = first.getDelay(NANOSECONDS);
13                 // 若是延時時間到了,就返回這個任務,用來執行。
14                 if (delay <= 0)
15                     return finishPoll(first);
16                 // 將first設置爲null,當線程等待時,不持有first的引用
17                 first = null; // don't retain ref while waiting
18 
19                 // 若是仍是原來那個等待隊列頭任務的線程,
20                 // 說明隊列頭任務的延時時間尚未到,繼續等待。
21                 if (leader != null)
22                     available.await();
23                 else {
24                     // 記錄一下當前等待隊列頭任務的線程
25                     Thread thisThread = Thread.currentThread();
26                     leader = thisThread;
27                     try {
28                         // 當任務的延時時間到了時,可以自動超時喚醒。
29                         available.awaitNanos(delay);
30                     } finally {
31                         if (leader == thisThread)
32                             leader = null;
33                     }
34                 }
35             }
36         }
37     } finally {
38         if (leader == null && queue[0] != null)
39             // 喚醒等待任務的線程
40             available.signal();
41         lock.unlock();
42     }
43 }

複製代碼

若是隊列中沒有任務,那麼就讓當前線程在available條件下等待。若是隊列頭任務的剩餘延時時間delay大於0,那麼就讓當前線程在available條件下等待delay時間。

超時等待獲取隊列頭元素

複製代碼

1 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
 2     throws InterruptedException {
 3     long nanos = unit.toNanos(timeout);
 4     final ReentrantLock lock = this.lock;
 5     lock.lockInterruptibly();
 6     try {
 7         for (;;) {
 8             RunnableScheduledFuture<?> first = queue[0];
 9             // 若是沒有任務。
10             if (first == null) {
11                 // 超時時間已到,那麼就直接返回null
12                 if (nanos <= 0)
13                     return null;
14                 else
15                     // 不然就讓線程在available條件下等待nanos時間
16                     nanos = available.awaitNanos(nanos);
17             } else {
18                 // 獲取任務的剩餘延時時間
19                 long delay = first.getDelay(NANOSECONDS);
20                 // 若是延時時間到了,就返回這個任務,用來執行。
21                 if (delay <= 0)
22                     return finishPoll(first);
23                 // 若是超時時間已到,那麼就直接返回null
24                 if (nanos <= 0)
25                     return null;
26                 // 將first設置爲null,當線程等待時,不持有first的引用
27                 first = null; // don't retain ref while waiting
28                 // 若是超時時間小於任務的剩餘延時時間,那麼就有可能獲取不到任務。
29                 // 在這裏讓線程等待超時時間nanos
30                 if (nanos < delay || leader != null)
31                     nanos = available.awaitNanos(nanos);
32                 else {
33                     Thread thisThread = Thread.currentThread();
34                     leader = thisThread;
35                     try {
36                         // 當任務的延時時間到了時,可以自動超時喚醒。
37                         long timeLeft = available.awaitNanos(delay);
38                         // 計算剩餘的超時時間
39                         nanos -= delay - timeLeft;
40                     } finally {
41                         if (leader == thisThread)
42                             leader = null;
43                     }
44                 }
45             }
46         }
47     } finally {
48         if (leader == null && queue[0] != null)
49             // 喚醒等待任務的線程
50             available.signal();
51         lock.unlock();
52     }
53 }

複製代碼

與take方法相比較,就要考慮設置的超時時間,若是超時時間到了,尚未獲取到有用任務,那麼就返回null。其餘的與take方法中邏輯同樣。

回到頂部

推薦博客

  http://www.javashuo.com/article/p-wpcnychj-ek.html

回到頂部

總結

使用優先級隊列DelayedWorkQueue,保證添加到隊列中的任務,會按照任務的延時時間進行排序,延時時間少的任務首先被獲取。

相關文章
相關標籤/搜索