使用 LinkedBlockingQueue 實現簡易版線程池

前一陣子在作聯繫人的導入功能,使用POI組件解析Excel文件後獲取到聯繫人列表,校驗以後批量導入。單從技術層面來講,導入操做一般狀況下是一個比較耗時的操做,並且若是聯繫人達到幾萬、幾十萬級別,必須拆分紅爲子任務來執行。綜上,可使用線程池來解決問題。技術選型上,沒有采用已有的 ThreadPoolExecutor 框架,而使用了自制的簡易版線程池。該簡易版的線程池,其實也是一個簡易版的【生產者-消費者】模型,任務的加入就像是生產的過程,任務的處理就像是消費的過程。咱們在這裏不去討論方案的合理性,只是從技術層面總結一下在實現簡易版線程池的過程當中,我所學到的知識。
 
代碼放在Github上,分享一下: https://github.com/Julius-Liu/threadpool

 

1、線程池設計

咱們首先使用數組 ArrayList 來做爲線程池的存儲結構,例如數組大小爲10就意味着這是一個大小爲10的線程池。而後咱們使用 LinkedBlockingQueue(鏈式阻塞隊列)來存放線程的參數。示意圖以下:

 

當線程池裏的線程初始化完成後,咱們但願線程都處於【飢餓】狀態,隨時等待參數傳入,而後執行。因此,此時線程應該處於阻塞狀態,以下圖所示:
 
當咱們將一個執行任務(一個參數)交給線程池之後,線程池會安排一個線程接收參數,這個線程會進入運行狀態。線程執行完之後,線程又會由於參數隊列爲空而進入阻塞狀態。某線程的執行狀態以下圖所示,執行完的阻塞態,如上圖所示。

 

假設線程池中有3個線程,咱們連續扔了3個參數給線程池,線程池會輪詢獲取線程,將參數塞給他們,而後這些線程會進入運行狀態。運行完成後迴歸阻塞狀態。以下圖所示:

 

以下圖所示,假設線程池中只有3個線程,咱們連續發8個參數給線程池,那麼池會輪流分配參數。線程在收到參數後就會執行。「消耗」掉一個參數後,會繼續消耗下一個參數,直到參數列表爲空爲止。

 

2、爲何使用 LinkedBlockingQueue

1. BlockingQueue

咱們必須先來講說爲何使用阻塞隊列 BlockingQueue。BlockingQueue 隊列爲空時,嘗試獲取隊頭元素的操做會阻塞,一直等到隊列中有元素時再返回。這個阻塞的特性,正是咱們須要的,咱們可讓線程一直等待元素插入,一旦插入當即執行。BlockingQueue 也支持在添加元素時,若是隊列已滿,那麼等到隊列能夠放入新元素時再放入。如此一來,咱們交給線程池的任務就不會丟失,哪怕超過了隊列的容量。
 
因此咱們定下方案,採用阻塞隊列來做爲數據結構,而後咱們來調研阻塞隊列經常使用的5種實現,看看選擇哪一種實現來完成線程池。
 

2. ArrayBlockingQueue

ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列,其內部按先進先出的原則對元素進行排序,其中put方法和take方法爲添加和刪除的阻塞方法。能夠說 ArrayBlockingQueue 是 阻塞隊列的最直觀的實現。
 

3. DelayQueue

DelayQueue是一個無界阻塞隊列,延遲隊列提供了在指定時間才能獲取隊列元素的功能,隊列頭元素是最接近過時的元素。沒有過時元素的話,使用poll()方法會返回null值,超時斷定是經過getDelay(TimeUnit.NANOSECONDS)方法的返回值小於等於0來判斷。
 
DelayQueue阻塞隊列在咱們系統開發中也經常會用到,例如緩存系統的設計。緩存中的對象,超過了空閒時間,須要從緩存中移出;例如任務調度系統,須要準確的把握任務的執行時間。咱們可能須要經過線程處理不少時間上要求很嚴格的數據,若是使用普通的線程,咱們就須要遍歷全部的對象,一個個檢查看數據是否過時。首先這樣在執行上的效率不會過高,其次就是這種設計的風格也大大的影響了數據的精度。一個須要12:00點執行的任務可能12:01 才執行,這樣對數據要求很高的系統有更大的弊端。使用 DelayQueue 能夠作到精準觸發。
 
由上可知,延遲隊列不是咱們須要的阻塞隊列實現。
 

4. LinkedBlockingQueue

LinkedBlockingQueue是一個由鏈表實現的有界隊列阻塞隊列,但大小默認值爲Integer.MAX_VALUE,也能夠在初始化的時候指定 capacity。和 ArrayBlockingQueue 同樣,其中put方法和take方法爲添加和刪除的阻塞方法。
 

5. PriorityBlockingQueue

優先級阻塞隊列經過使用堆這種數據結構實現將隊列中的元素按照某種排序規則進行排序,從而改變先進先出的隊列順序,提供開發者改變隊列中元素的順序的能力。隊列中的元素必須是可比較的,即實現Comparable接口,或者在構建函數時提供可對隊列元素進行比較的Comparator對象。不能夠放null,會報空指針異常,也不可放置沒法比較的元素;add方法添加元素時,是自下而上的調整堆,取出元素時,是自上而下的調整堆順序。
 
咱們放入參數隊列中的參數都是平級的,不涉及優先級,所以咱們不考慮優先級阻塞隊列。
 

6. SynchronousQueue

同步隊列實際上不是一個真正的隊列,由於它不會爲隊列中元素維護存儲空間。與其餘隊列不一樣的是,它維護一組線程,這些線程在等待着把元素加入或移出隊列。同步隊列是輕量級的,不具備任何內部容量,咱們能夠用來在線程間安全的交換單一元素。
由於同步隊列沒有存儲功能,所以put和take會一直阻塞,直到有另外一個線程已經準備好參與到交付過程當中。僅當有足夠多的消費者,而且老是有一個消費者準備好獲取交付的工做時,才適合使用同步隊列。
 
應用場景,咱們來看一下Java併發包裏的 newCachedThreadPool 方法:
 1 package java.util.concurrent;
 2 
 3 /**
 4  * 帶有緩存的線程池
 5  */
 6 public static ExecutorService newCachedThreadPool() {
 7     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 8                                   60L, TimeUnit.SECONDS,
 9                                   new SynchronousQueue<Runnable>());
10 }

 

Executors.newCachedThreadPool() 方法返回的 ThreadPoolExecutor 實例,其內部的阻塞隊列使用的就是同步隊列。因爲ThreadPoolExecutor內部實現任務提交的時候調用的是工做隊列的非阻塞式入隊列方法(offer方法),所以,在使用同步隊列做爲工做隊列的前提下,客戶端代碼向線程池提交任務時,而線程池中又沒有空閒的線程可以從同步隊列隊列實例中取一個任務,那麼相應的offer方法調用就會失敗(即任務沒有被存入工做隊列)。此時,ThreadPoolExecutor會新建一個新的工做者線程用於對這個入隊列失敗的任務進行處理(假設此時線程池的大小還未達到其最大線程池大小)。
 
如上所述,同步隊列沒有內部容量來存放參數,所以咱們不選擇同步隊列。
 

7. 阻塞隊列選擇

研究了阻塞隊列的5中實現之後,候選者就在 ArrayBlockingQueue 和 LinkedBlockingQueue 二者中。其實要實現本文的簡易版線程池,使用數組阻塞隊列和連接阻塞隊列均可以,若是你要考慮一些極端狀況下的性能問題,那麼透徹的研究二者的使用場景就很是有必要。數組阻塞隊列和連接阻塞隊列的成員變量和方法都很類似,相同點咱們就先不說了。下面咱們來看看二者的不一樣點:
  1. 隊列大小有所不一樣,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue能夠是有界的也能夠是無界的(Integer.MAX_VALUE)。對於後者而言,當添加速度大於移除速度時,在無界的狀況下,可能會形成內存溢出等問題。
  2. 數據存儲容器不一樣,ArrayBlockingQueue採用的是數組做爲數據存儲容器,而LinkedBlockingQueue採用的則是以Node節點做爲鏈接對象的鏈表。
  3. 因爲ArrayBlockingQueue採用的是數組的存儲容器,所以在插入或刪除元素時不會產生或銷燬任何額外的對象實例,而LinkedBlockingQueue則會生成一個額外的Node對象。這可能在長時間內須要高效併發地處理大批量數據的時,對於GC可能存在較大影響。
  4. 實現隊列添加或移除的鎖不同,ArrayBlockingQueue實現的隊列中的鎖是沒有分離的,即添加操做和移除操做採用的同一個ReentrantLock鎖,而LinkedBlockingQueue實現的隊列中的鎖是分離的,其添加採用的是putLock,移除採用的則是takeLock,這樣能大大提升隊列的吞吐量,也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。
 

3、LinkedBlockingQueue 底層方法

咱們來調研一下 LinkedBlockingQueue,看看哪些變量和方法可使用。
先來看一下 LinkedBlockingQueue 的數據結構,有一個直觀的瞭解:

 

說明:
  1. LinkedBlockingQueue繼承於AbstractQueue,它本質上是一個FIFO(先進先出)的隊列。
  2. LinkedBlockingQueue實現了BlockingQueue接口,它支持多線程併發。當多線程競爭同一個資源時,某線程獲取到該資源以後,其它線程須要阻塞等待。
  3. LinkedBlockingQueue是經過單鏈表實現的。
    • head是鏈表的表頭。取出數據時,都是從表頭head處獲取。
    • last是鏈表的表尾。新增數據時,都是從表尾last處插入。
    • count是鏈表的實際大小,即當前鏈表中包含的節點個數。
    • capacity是列表的容量,它是在建立鏈表時指定的。
    • putLock是插入鎖,takeLock是取出鎖;notEmpty是「非空條件」,notFull是「未滿條件」。經過它們對鏈表進行併發控制。
 
咱們來看一下 LinkedBlockingQueue 經常使用的變量:

 1 // 容量
 2 private final int capacity;
 3 
 4 // 當前數量
 5 private final AtomicInteger count = new AtomicInteger(0);
 6 
 7 // 鏈表的表頭
 8 transient Node<E> head; 
 9 
10 // 鏈表的表尾
11 private transient Node<E> last; 
12 
13 // 用於控制刪除元素的【取出鎖】和鎖對應的【非空條件】
14 private final ReentrantLock takeLock = new ReentrantLock();
15 private final Condition notEmpty = takeLock.newCondition();
16 
17 // 用於控制添加元素的【插入鎖】和鎖對應的【非滿條件】
18 private final ReentrantLock putLock = new ReentrantLock();
19 private final Condition notFull = putLock.newCondition();

 

這裏的兩把鎖,takeLock 和 putLock,和兩個條件,notEmpty 和 notFull 是咱們考察的重點。
LinkedBlockingQueue在實現「多線程對競爭資源的互斥訪問」時,對於「插入」和「取出(刪除)」操做分別使用了不一樣的鎖
  • 對於插入操做,經過 putLock(插入鎖)進行同步
  • 對於取出操做,經過 takeLock(取出鎖)進行同步
 
此外,插入鎖putLock和notFull(非滿條件)相關聯,取出鎖takeLock和notEmpty(非空條件)相關聯。經過notFull條件和notEmpty條件更細膩的控制putLock 和 takeLock。
 
舉例說明,若某線程(線程A)要取出數據時,隊列正好爲空,則該線程會執行notEmpty.await()進行等待;當其它某個線程(線程B)向隊列中插入了數據以後,會調用notEmpty.signal()喚醒「notEmpty上的等待線程」。此時,線程A會被喚醒從而得以繼續運行。 此外,線程A在執行取數據前,會獲取takeLock,在取數據執行完畢再釋放takeLock。
 
若某線程(線程H)要插入數據時(put操做),隊列已滿,則該線程會它執行notFull.await()進行等待;當其它某個線程(線程I)取出數據以後,會調用notFull.signal()喚醒「notFull上的等待線程」。此時,線程H就會被喚醒從而得以繼續運行。 此外,線程H在執行插入操做前,會獲取putLock,在插入操做執行完畢才釋放putLock。
 

LinkedBlockingQueue 經常使用函數

 1 // 建立一個容量爲 Integer.MAX_VALUE 的 LinkedBlockingQueue
 2 LinkedBlockingQueue()
 3 
 4 // 建立一個容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含給定 collection 的元素,元素按該 collection 迭代器的遍歷順序添加
 5 LinkedBlockingQueue(Collection<? extends E> c)
 6 
 7 // 建立一個具備給定(固定)容量的 LinkedBlockingQueue
 8 LinkedBlockingQueue(int capacity)
 9 
10 // 從隊列完全移除全部元素
11 void clear()
12 
13 // 將指定元素插入到此隊列的尾部(若是當即可行且不會超出此隊列的容量),在成功時返回 true,若是此隊列已滿,則返回 false
14 boolean offer(E e)
15 
16 // 將指定元素插入到此隊列的尾部,若有必要,則等待指定的時間以使空間變得可用
17 boolean offer(E e, long timeout, TimeUnit unit)
18 
19 // 獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null
20 E peek()
21 
22 // 獲取並移除此隊列的頭,若是此隊列爲空,則返回 null
23 E poll()
24 
25 // 獲取並移除此隊列的頭部,在指定的等待時間前等待可用的元素(若是有必要)
26 E poll(long timeout, TimeUnit unit)
27 
28 // 將指定元素插入到此隊列的尾部,若有隊列滿,則等待空間變得可用
29 void put(E e)
30 
31 // 返回理想狀況下(沒有內存和資源約束)此隊列可接受而且不會被阻塞的附加元素數量
32 int remainingCapacity()
33 
34 // 今後隊列移除指定元素的單個實例(若是存在)
35 boolean remove(Object o)
36 
37 // 返回隊列中的元素個數
38 int size()
39 
40 // 獲取並移除此隊列的頭部,在元素變得可用以前一直等待(若是有必要)
41 E take()

 

咱們看到 offer(E e) 和 put(E e) 都是往隊尾插入元素,poll() 和 take() 都是取出隊頭的元素,可是它們之間仍是有細微的差異,咱們接下來重點看看這4個方法的源碼。
 
下面來看一下 offer(E e) 的源碼:

 1 /**
 2  * 將指定元素插入到此隊列的尾部(若是當即可行且不會超出此隊列的容量)
 3  * 在成功時返回 true,若是此隊列已滿,則返回 false
 4  * 若是使用了有容量限制的隊列,推薦使用add方法,add方法在失敗的時候只是拋出異常
 5  */
 6 public boolean offer(E e) {
 7     if (e == null) throw new NullPointerException();
 8     final AtomicInteger count = this.count;
 9     if (count.get() == capacity)
10         // 若是隊列已滿,則返回false,表示插入失敗
11         return false;
12     int c = -1;
13     Node<E> node = new Node<E>(e);
14     final ReentrantLock putLock = this.putLock;
15     // 獲取 putLock
16     putLock.lock();
17     try {
18         // 再次對【隊列是否是滿】的進行判斷,若是不是滿的,則插入節點
19         if (count.get() < capacity) {
20             enqueue(node);                 // 在隊尾插入節點
21             c = count.getAndIncrement();   // 當前節點數量+1,並返回插入以前節點數量
22             if (c + 1 < capacity)
23                 // 若是在插入元素以後,隊列仍然未滿,則喚醒notFull上的等待線程
24                 notFull.signal();
25         }
26     } finally {
27         // 釋放 putLock
28         putLock.unlock();
29     }
30     if (c == 0)
31         // 若是在插入節點前,隊列爲空,那麼插入節點後,喚醒notEmpty上的等待線程
32         signalNotEmpty();
33     return c >= 0;
34 }

 

下面來看看 put(E e) 的源碼:php

 1 /**
 2  * 將指定元素插入到此隊列的尾部,若有隊列滿,則等待空間變得可用
 3  *
 4  * @throws InterruptedException {@inheritDoc}
 5  * @throws NullPointerException {@inheritDoc}
 6  */
 7 public void put(E e) throws InterruptedException {
 8     if (e == null) throw new NullPointerException();
 9     
10     int c = -1;
11     Node<E> node = new Node<E>(e);
12     final ReentrantLock putLock = this.putLock;
13     final AtomicInteger count = this.count;
14     putLock.lockInterruptibly();    // 可中斷地獲取 putLock
15     try {
16         // count 變量是被 putLock 和 takeLock 保護起來的,因此能夠真實反映隊列當前的容量狀況
17         while (count.get() == capacity) {
18             notFull.await();
19         }
20         enqueue(node);                // 在隊尾插入節點
21         c = count.getAndIncrement();  // 當前節點數量+1,並返回插入以前節點數量
22         if (c + 1 < capacity)
23             // 若是在插入元素以後,隊列仍然未滿,則喚醒notFull上的等待線程
24             notFull.signal();
25     } finally {
26         putLock.unlock();             // 釋放 putLock
27     }
28     if (c == 0)
29         // 若是在插入節點前,隊列爲空,那麼插入節點後,喚醒notEmpty上的等待線程
30         signalNotEmpty();
31 }

 

二者都用到了 signalNotEmpty(),下面來看一下源碼:

 1 /**
 2  * 通知一個等待的take。該方法應該僅僅從put/offer調用,不然通常很難鎖住takeLock
 3  */
 4 private void signalNotEmpty() {
 5     final ReentrantLock takeLock = this.takeLock;
 6     takeLock.lock();           // 獲取 takeLock
 7     try {
 8         notEmpty.signal();     // 喚醒notEmpty上的等待線程,意味着如今能夠獲取元素了
 9     } finally {
10         takeLock.unlock();    // 釋放 takeLock
11     }
12 }

 

下面來看看 poll() 的源碼:

 1 /**
 2  * 獲取並移除此隊列的頭,若是此隊列爲空,則返回 null
 3  */
 4 public E poll() {
 5     final AtomicInteger count = this.count;
 6     if (count.get() == 0)
 7         return null;
 8     E x = null;
 9     int c = -1;
10     final ReentrantLock takeLock = this.takeLock;
11     takeLock.lock();            // 獲取 takeLock
12     try {
13         if (count.get() > 0) {
14             x = dequeue();                 // 獲取隊頭元素,並移除
15             c = count.getAndDecrement();   // 當前節點數量-1,並返回移除以前節點數量
16             if (c > 1)
17                 // 若是在移除元素以後,隊列中仍然有元素,則喚醒notEmpty上的等待線程
18                 notEmpty.signal();
19         }
20     } finally {
21         takeLock.unlock();      // 釋放 takeLock
22     }
23     if (c == capacity)
24         // 若是在移除節點前,隊列是滿的,那麼移除節點後,喚醒notFull上的等待線程
25         signalNotFull();
26     return x;
27 }

 

下面來看看 take() 的源碼:

 1 /**
 2  * 取出並返回隊列的頭。若隊列爲空,則一直等待
 3  */
 4 public E take() throws InterruptedException { 
 5     E x; 
 6     int c = -1; 
 7     final AtomicInteger count = this.count; 
 8     final ReentrantLock takeLock = this.takeLock; 
 9     // 獲取 takeLock,若當前線程是中斷狀態,則拋出InterruptedException異常 
10     takeLock.lockInterruptibly(); 
11     try { 
12         // 若隊列爲空,則一直等待
13        while (count.get() == 0) { 
14            notEmpty.await(); 
15        } 
16        x = dequeue();                  // 從隊頭取出元素 
17        c = count.getAndDecrement();    // 取出元素以後,節點數量-1;並返回移除以前的節點數量
18        if (c > 1) 
19            // 若是在移除元素以後,隊列中仍然有元素,則喚醒notEmpty上的等待線程
20            notEmpty.signal();
21     } finally { 
22         takeLock.unlock();             // 釋放 takeLock
23     } 
24     
25     if (c == capacity) 
26         // 若是在取出元素以前,隊列是滿的,就在取出元素以後,喚醒notFull上的等待線程
27         signalNotFull(); 
28     return x;
29 }

 

二者都用到了signalNotFull(),signalNotFull()的源碼以下:

 1 /**
 2  * 喚醒notFull上的等待線程,只能從 poll 或 take 調用
 3  */
 4 private void signalNotFull() { 
 5     final ReentrantLock putLock = this.putLock; 
 6     putLock.lock();           // putLock 上鎖
 7     try { 
 8         notFull.signal();     // 喚醒notFull上的等待線程,意味着能夠插入元素了
 9     } finally { 
10         putLock.unlock();    // putLock 解鎖
11     }
12 }

 

 
從上面的4個經常使用函數來看,咱們想要在隊列爲空的時候,將獲取這個動做阻塞,所以咱們選擇【take方法】而不是【poll方法】。值得注意的是帶有參數的poll方法能夠更精細地控制當隊列爲空時,獲取動做阻塞多久。在本文中咱們不考慮這種作法,直接讓獲取操做在 notEmpty 上等待。對於插入操做,咱們採用【offer方法】而不是【put方法】,前者在隊列滿的時候返回false,後者在隊列滿的時候會在 notFull 上等待。在本文中,咱們把線程池作的簡單一些,若是隊列滿就提示重試。
 

4、簡易版線程池代碼實現

具有了 LinkedBlockingQueue 的底層代碼解讀之後,咱們來實現簡易版線程池。
其實在簡易版線程池初期,因爲對 LinkedBlockingQueue 的底層方法不熟悉,所以對線程手動 wait 和上鎖。具體來講,根據隊列size的狀況來決定線程是否要進入wait方法,而後在插入參數的時候,使用 synchronized 關鍵字鎖住整個隊列,再offer。這種作法,徹底沒有考慮已有的 takeLock,putLock,notEmpty條件和notFull條件。因此後來仔細研究了連接阻塞隊列的特性,修改了線程池的實現,算是作了正確的事。
 

1. 註冊成爲 Spring Bean

咱們但願在Springboot 程序啓動的時候,將線程池初始化。可使用 Spring 提供的 InitializingBean 接口的 afterPropertiesSet 方法,在全部基礎屬性初始化完成後,進行線程池的初始化。
 1 package cn.com.gkmeteor.threadpool.utils;
 2 
 3 @Component
 4 public class ThreadPoolUtil implements InitializingBean {
 5 
 6     public static int POOL_SIZE = 10;
 7 
 8     @Autowired
 9     private ThreadExecutorService threadExecutorService;   // 具體的線程處理類
10 
11     private List<ThreadWithQueue> threadpool = new ArrayList<>();
12 
13     /**
14      * 在全部基礎屬性初始化完成後,初始化當前類
15      *
16      * @throws Exception
17      */
18     @Override
19     public void afterPropertiesSet() throws Exception {
20         for (int i = 0; i < POOL_SIZE; i++) {
21             ThreadWithQueue threadWithQueue = new ThreadWithQueue(i, threadExecutorService);
22             this.threadpool.add(threadWithQueue);
23         }
24     }
25 }

 

2. 輪詢獲取一個線程

咱們但願將任務輪流分給線程池中的線程。要實現這個比較簡單,直接兩行代碼搞定。
1 public static int POOL_SIZE = 10;  // 線程池容量
2 index = (++index) % POOL_SIZE;     // index 是當前選中的線程下標

 

3. 參數入隊和出隊,線程運行和阻塞

主要使用 queue.offer(param) 和 String param = queue.take() 這兩個方法,具體來看下面的代碼:

 1 package cn.com.gkmeteor.threadpool.utils;
 2 
 3 import cn.com.gkmeteor.threadpool.service.ThreadExecutorService;
 4 import org.slf4j.Logger;
 5 import org.slf4j.LoggerFactory;
 6 
 7 import java.util.concurrent.BlockingQueue;
 8 
 9 /**
10  * 帶有【參數阻塞隊列】的線程
11  */
12 public class ThreadWithQueue extends Thread {
13 
14     public static int CAPACITY = 10;
15 
16     private Logger logger = LoggerFactory.getLogger(ThreadWithQueue.class);
17 
18     private BlockingQueue<String> queue;
19 
20     private ThreadExecutorService threadExecutorService;    // 線程運行後的業務邏輯處理
21 
22     private String threadName;
23 
24     public String getThreadName() {
25         return threadName;
26     }
27 
28     public void setThreadName(String threadName) {
29         this.threadName = threadName;
30     }
31 
32     /**
33      * 構造方法
34      *
35      * @param i                     第幾個線程
36      * @param threadExecutorService 線程運行後的業務邏輯處理
37      */
38     public ThreadWithQueue(int i, ThreadExecutorService threadExecutorService) {
39         queue = new java.util.concurrent.LinkedBlockingQueue<>(CAPACITY);
40         threadName = "Thread(" + i + ")";
41 
42         this.threadExecutorService = threadExecutorService;
43 
44         this.start();
45     }
46 
47     /**
48      * 將參數放到線程的參數隊列中
49      *
50      * @param param 參數
51      * @return
52      */
53     public String paramAdded(String param) {
54         String result = "";
55         if(queue.offer(param)) {
56             logger.info("參數已入隊,{} 目前參數個數 {}", this.getThreadName(), queue.size());
57             result = "參數已加入線程池,等待處理";
58         } else {
59             logger.info("隊列已達最大容量,請稍後重試");
60             result = "線程池已滿,請稍後重試";
61         }
62         return result;
63     }
64 
65     public synchronized int getQueueSize() {
66         return queue.size();
67     }
68 
69     @Override
70     public void run() {
71         while (true) {
72             try {
73                 String param = queue.take();
74                 logger.info("{} 開始運行,參數隊列中還有 {} 個在等待", this.getThreadName(), this.getQueueSize());
75                 if (param.startsWith("contact")) {
76                     threadExecutorService.doContact(param);
77                 } else if (param.startsWith("user")) {
78                     threadExecutorService.doUser(param);
79                 } else {
80                     logger.info("參數無效,不作處理");
81                 }
82                 logger.info("{} 本次處理完成", this.getThreadName());
83             } catch (Exception e) {
84                 e.printStackTrace();
85             }
86         }
87     }
88 }

 

瞭解了連接阻塞隊列的底層方法後,使用起來就底氣十足。具體來講:java

offer方法會往隊尾添加元素,若是隊列已滿,那麼就會返回false,我在這時告訴調用者,線程池已滿,請稍後重試。
take方法會取出隊首元素,若是隊列爲空則一直等待。因此當全部線程初始化完成後,第一次運行的時候都會阻塞在 String param = queue.take(),一旦有參數入隊,纔會繼續執行。又由於 while(true) 循環,會不斷地take,根據隊列中參數的狀況來運行或阻塞。
 

5、總結

本文使用 LinkedBlockingQueue 實現了一個簡易版的線程池,該線程池使用在聯繫人導入的任務中。同時閱讀了連接阻塞隊列和數組阻塞隊列的源碼,對阻塞隊列有所瞭解,僅僅作到了會使用阻塞隊列。
 

6、參考資料

相關文章
相關標籤/搜索