JAVA線程池ThreadPoolExecutor與阻塞隊列BlockingQueue .

從Java5開始,Java提供了本身的線程池。每次只執行指定數量的線程,java.util.concurrent.ThreadPoolExecutor 就是這樣的線程池。如下是個人學習過程。

首先是構造函數簽名以下: java

  1. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);   
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
參數介紹:

corePoolSize 核心線程數,指保留的線程池大小(不超過maximumPoolSize值時,線程池中最多有corePoolSize 個線程工做)。 
maximumPoolSize 指的是線程池的最大大小(線程池中最大有corePoolSize 個線程可運行)。 
keepAliveTime 指的是空閒線程結束的超時時間(當一個線程不工做時,過keepAliveTime 長時間將中止該線程)。 
unit 是一個枚舉,表示 keepAliveTime 的單位(有NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS,7個可選值)。 
workQueue 表示存聽任務的隊列(存放須要被線程池執行的線程隊列)。 
handler 拒絕策略(添加任務失敗後如何處理該任務). ide

一、線程池剛建立時,裏面沒有一個線程。任務隊列是做爲參數傳進來的。不過,就算隊列裏面有任務,線程池也不會立刻執行它們。
二、當調用 execute() 方法添加一個任務時,線程池會作以下判斷:
    a. 若是正在運行的線程數量小於 corePoolSize,那麼立刻建立線程運行這個任務;
    b. 若是正在運行的線程數量大於或等於 corePoolSize,那麼將這個任務放入隊列。
    c. 若是這時候隊列滿了,並且正在運行的線程數量小於 maximumPoolSize,那麼仍是要建立線程運行這個任務;
    d. 若是隊列滿了,並且正在運行的線程數量大於或等於 maximumPoolSize,那麼線程池會拋出異常,告訴調用者「我不能再接受任務了」。
三、當一個線程完成任務時,它會從隊列中取下一個任務來執行。
四、當一個線程無事可作,超過必定的時間(keepAliveTime)時,線程池會判斷,若是當前運行的線程數大於 corePoolSize,那麼這個線程就被停掉。因此線程池的全部任務完成後,它最終會收縮到 corePoolSize 的大小。
       這個過程說明,並非先加入任務就必定會先執行。假設隊列大小爲 4,corePoolSize爲2,maximumPoolSize爲6,那麼當加入15個任務時,執行的順序相似這樣:首先執行任務 一、2,而後任務3~6被放入隊列。這時候隊列滿了,任務七、八、九、10 會被立刻執行,而任務 11~15 則會拋出異常。最終順序是:一、二、七、八、九、十、三、四、五、6。固然這個過程是針對指定大小的ArrayBlockingQueue<Runnable>來講,若是是LinkedBlockingQueue<Runnable>,由於該隊列無大小限制,因此不存在上述問題。 函數

示例一,LinkedBlockingQueue<Runnable>隊列使用: 學習

  1. import java.util.concurrent.BlockingQueue;  
  2. import java.util.concurrent.LinkedBlockingQueue;  
  3. import java.util.concurrent.ThreadPoolExecutor;  
  4. import java.util.concurrent.TimeUnit;  
  5.   
  6. /** 
  7.  * Created on 2011-12-28 
  8.  * <p>Description: [Java 線程池學習]</p> 
  9.  * @author         shixing_11@sina.com 
  10.  */  
  11. public class ThreadPoolTest implements Runnable {   
  12.      public void run() {   
  13.           synchronized(this) {   
  14.             try{  
  15.                 System.out.println(Thread.currentThread().getName());  
  16.                 Thread.sleep(3000);  
  17.             }catch (InterruptedException e){  
  18.                 e.printStackTrace();  
  19.             }  
  20.           }   
  21.      }   
  22.        
  23.      public static void main(String[] args) {   
  24.          BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();  
  25.          ThreadPoolExecutor executor = new ThreadPoolExecutor(261, TimeUnit.DAYS, queue);  
  26.          for (int i = 0; i < 10; i++) {     
  27.              executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));     
  28.              int threadSize = queue.size();  
  29.              System.out.println("線程隊列大小爲-->"+threadSize);  
  30.          }     
  31.          executor.shutdown();    
  32.      }  
  33. }   
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created on 2011-12-28
 * <p>Description: [Java 線程池學習]</p>
 * @author         shixing_11@sina.com
 */
public class ThreadPoolTest implements Runnable { 
     public void run() { 
          synchronized(this) { 
            try{
                System.out.println(Thread.currentThread().getName());
                Thread.sleep(3000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
          } 
     } 
     
     public static void main(String[] args) { 
         BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
         for (int i = 0; i < 10; i++) {   
             executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));   
             int threadSize = queue.size();
             System.out.println("線程隊列大小爲-->"+threadSize);
         }   
         executor.shutdown();  
     }
}
輸出結果以下:

線程隊列大小爲-->0
線程名稱:pool-1-thread-1
線程隊列大小爲-->0
線程隊列大小爲-->1
線程隊列大小爲-->2
線程隊列大小爲-->3
線程隊列大小爲-->4
線程隊列大小爲-->5
線程隊列大小爲-->6
線程隊列大小爲-->7
線程隊列大小爲-->8
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
測試

可見,線程隊列最大爲8,共執行了10個線線程。由於是從線程池裏運行的線程,因此雖然將線程的名稱設爲"TestThread".concat(""+i),但輸出後仍是變成了pool-1-thread-x。 this

示例二,LinkedBlockingQueue<Runnable>隊列使用: spa

  1. import java.util.concurrent.BlockingQueue;  
  2. import java.util.concurrent.ArrayBlockingQueue;  
  3. import java.util.concurrent.ThreadPoolExecutor;  
  4. import java.util.concurrent.TimeUnit;  
  5.   
  6. /** 
  7.  * Created on 2011-12-28 
  8.  * <p>Description: [Java 線程池學習]</p> 
  9.  * @author         shixing_11@sina.com 
  10.  */  
  11. public class ThreadPoolTest implements Runnable {   
  12.      public void run() {   
  13.           synchronized(this) {   
  14.             try{  
  15.                 System.out.println("線程名稱:"+Thread.currentThread().getName());  
  16.                 Thread.sleep(3000); //休眠是爲了讓該線程不至於執行完畢後從線程池裏釋放   
  17.             }catch (InterruptedException e){  
  18.                 e.printStackTrace();  
  19.             }  
  20.           }   
  21.      }   
  22.        
  23.      public static void main(String[] args) throws InterruptedException {   
  24.          BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定爲4的線程隊列   
  25.          ThreadPoolExecutor executor = new ThreadPoolExecutor(261, TimeUnit.DAYS, queue);  
  26.          for (int i = 0; i < 10; i++) {     
  27.              executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));     
  28.              int threadSize = queue.size();  
  29.              System.out.println("線程隊列大小爲-->"+threadSize);  
  30.          }     
  31.          executor.shutdown();    
  32.      }  
  33. }  
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created on 2011-12-28
 * <p>Description: [Java 線程池學習]</p>
 * @author         shixing_11@sina.com
 */
public class ThreadPoolTest implements Runnable { 
     public void run() { 
          synchronized(this) { 
            try{
                System.out.println("線程名稱:"+Thread.currentThread().getName());
                Thread.sleep(3000); //休眠是爲了讓該線程不至於執行完畢後從線程池裏釋放
            }catch (InterruptedException e){
                e.printStackTrace();
            }
          } 
     } 
     
     public static void main(String[] args) throws InterruptedException { 
         BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定爲4的線程隊列
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
         for (int i = 0; i < 10; i++) {   
             executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));   
             int threadSize = queue.size();
             System.out.println("線程隊列大小爲-->"+threadSize);
         }   
         executor.shutdown();  
     }
}
輸出結果以下:

線程隊列大小爲-->0
線程隊列大小爲-->0
線程隊列大小爲-->1
線程隊列大小爲-->2
線程隊列大小爲-->3
線程隊列大小爲-->4
線程隊列大小爲-->4
線程隊列大小爲-->4
線程隊列大小爲-->4
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-3
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-4
線程隊列大小爲-->4
線程名稱:pool-1-thread-5
線程名稱:pool-1-thread-6
線程名稱:pool-1-thread-5
線程名稱:pool-1-thread-6
線程名稱:pool-1-thread-4
線程名稱:pool-1-thread-2
.net

可見,總共10個線程,由於核心線程數爲2,2個線程被當即運行,線程隊列大小爲4,因此4個線程被加入隊列,最大線程數爲6,還能運行6-2=4個,其10個線程的其他4個線程又當即運行了。 線程

若是將咱們要運行的線程數10改成11,則因爲最大線程數6+線程隊列大小4=10<11,則根據線程池工做原則,最後一個線程將被拒絕策略拒絕,將示例二的main方法改成以下: code

  1. public static void main(String[] args) throws InterruptedException {   
  2.     ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定爲4的線程隊列   
  3.     ThreadPoolExecutor executor = new ThreadPoolExecutor(261, TimeUnit.DAYS, queue);  
  4.     for (int i = 0; i < 11; i++) {     
  5.         executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));     
  6.         int threadSize = queue.size();  
  7.         System.out.println("線程隊列大小爲-->"+threadSize);  
  8.     }     
  9.     executor.shutdown();    
  10. }  
public static void main(String[] args) throws InterruptedException { 
         ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定爲4的線程隊列
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
         for (int i = 0; i < 11; i++) {   
             executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));   
             int threadSize = queue.size();
             System.out.println("線程隊列大小爲-->"+threadSize);
         }   
         executor.shutdown();  
     }
輸出結果:

線程隊列大小爲-->0
線程名稱:pool-1-thread-1
線程隊列大小爲-->0
線程隊列大小爲-->1
線程隊列大小爲-->2
線程隊列大小爲-->3
線程隊列大小爲-->4
線程名稱:pool-1-thread-2
線程隊列大小爲-->4
線程名稱:pool-1-thread-3
線程隊列大小爲-->4
線程名稱:pool-1-thread-4
線程隊列大小爲-->4
線程名稱:pool-1-thread-5
線程隊列大小爲-->4
線程名稱:pool-1-thread-6
Exception in thread "main" java.util.concurrent.RejectedExecutionException
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
at ths.ThreadPoolTest.main(ThreadPoolTest.java:30)

線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-3
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-4

很明顯,拋RejectedExecutionException異常了,被拒絕策略拒絕了,這就說明線程超出了線程池的總容量(線程隊列大小+最大線程數)。

         對於 java.util.concurrent.BlockingQueue 類有有三種方法將線程添加到線程隊列裏面,然而如何區別三種方法的不一樣呢,其實在隊列未滿的狀況下結果相同,都是將線程添加到線程隊列裏面,區分就在於當線程隊列已經滿的時候,此時

public boolean add(E e) 方法將拋出IllegalStateException異常,說明隊列已滿。

public boolean offer(E e) 方法則不會拋異常,只會返回boolean值,告訴你添加成功與否,隊列已滿,固然返回false。

public void put(E e) throws InterruptedException 方法則一直阻塞(即等待,直到線程池中有線程運行完畢,能夠加入隊列爲止)。

爲了證實對上面這三個方法的描述,咱們將示例二改成以下、public boolean add(E e)方法測試程序:

  1. public static void main(String[] args) throws InterruptedException {   
  2.     BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定爲4的線程隊列   
  3.     ThreadPoolExecutor executor = new ThreadPoolExecutor(261, TimeUnit.DAYS, queue);  
  4.     for (int i = 0; i < 10; i++) {     
  5.         executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));     
  6.         int threadSize = queue.size();  
  7.         System.out.println("線程隊列大小爲-->"+threadSize);  
  8.         if (threadSize==4){  
  9.             queue.add(new Runnable() {  //隊列已滿,拋異常   
  10.                 @Override  
  11.                 public void run(){  
  12.                     System.out.println("我是新線程,看看能不能搭個車加進去!");  
  13.                       
  14.                 }  
  15.             });  
  16.         }  
  17.     }     
  18.     executor.shutdown();    
  19. }  
public static void main(String[] args) throws InterruptedException { 
         BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定爲4的線程隊列
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
         for (int i = 0; i < 10; i++) {   
             executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));   
             int threadSize = queue.size();
             System.out.println("線程隊列大小爲-->"+threadSize);
             if (threadSize==4){
                 queue.add(new Runnable() {  //隊列已滿,拋異常
                     @Override
                     public void run(){
                         System.out.println("我是新線程,看看能不能搭個車加進去!");
                         
                     }
                 });
             }
         }   
         executor.shutdown();  
     }
運行結果:

線程隊列大小爲-->0
線程名稱:pool-1-thread-1
線程隊列大小爲-->0
線程隊列大小爲-->1
線程隊列大小爲-->2
線程隊列大小爲-->3
線程隊列大小爲-->4
線程名稱:pool-1-thread-2
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(Unknown Source)
at java.util.concurrent.ArrayBlockingQueue.add(Unknown Source)
at ths.ThreadPoolTest.main(ThreadPoolTest.java:35)

線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1

很明顯,當線程隊列已滿,即線程隊列裏的線程數爲4時,拋了異常,add線程失敗。再來看public boolean offer(E e) 方法測試程序:

  1. <STRONG> </STRONG>public static void main(String[] args) throws InterruptedException {   
  2.          BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定爲4的線程隊列   
  3.          ThreadPoolExecutor executor = new ThreadPoolExecutor(261, TimeUnit.DAYS, queue);  
  4.          for (int i = 0; i < 10; i++) {     
  5.              executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));     
  6.              int threadSize = queue.size();  
  7.              System.out.println("線程隊列大小爲-->"+threadSize);  
  8.              if (threadSize==4){  
  9.                  final boolean flag = queue.offer(new Runnable() {  
  10.                      @Override  
  11.                      public void run(){  
  12.                          System.out.println("我是新線程,看看能不能搭個車加進去!");  
  13.                            
  14.                      }  
  15.                  });  
  16.                  System.out.println("添加新線程標誌爲-->"+flag);  
  17.              }  
  18.          }     
  19.          executor.shutdown();    
  20.      }  
 public static void main(String[] args) throws InterruptedException { 
         BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定爲4的線程隊列
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
         for (int i = 0; i < 10; i++) {   
             executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));   
             int threadSize = queue.size();
             System.out.println("線程隊列大小爲-->"+threadSize);
             if (threadSize==4){
                 final boolean flag = queue.offer(new Runnable() {
                     @Override
                     public void run(){
                         System.out.println("我是新線程,看看能不能搭個車加進去!");
                         
                     }
                 });
                 System.out.println("添加新線程標誌爲-->"+flag);
             }
         }   
         executor.shutdown();  
     }
運行結果以下:

線程隊列大小爲-->0
線程名稱:pool-1-thread-1
線程隊列大小爲-->0
線程隊列大小爲-->1
線程隊列大小爲-->2
線程隊列大小爲-->3
線程隊列大小爲-->4
添加新線程標誌爲-->false
線程隊列大小爲-->4
線程名稱:pool-1-thread-3
添加新線程標誌爲-->false
線程名稱:pool-1-thread-2
線程隊列大小爲-->4
添加新線程標誌爲-->false
線程名稱:pool-1-thread-4
線程隊列大小爲-->4
添加新線程標誌爲-->false
線程名稱:pool-1-thread-5
線程隊列大小爲-->4
添加新線程標誌爲-->false
線程名稱:pool-1-thread-6
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-3
線程名稱:pool-1-thread-4

能夠看到,當線程隊列已滿的時候,線程沒有被添加到線程隊列,程序也沒有拋異常。繼續看public void put(E e) throws InterruptedException;方法測試程序:

  1. public static void main(String[] args) throws InterruptedException {   
  2.          BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定爲4的線程隊列   
  3.          ThreadPoolExecutor executor = new ThreadPoolExecutor(261, TimeUnit.DAYS, queue);  
  4.          for (int i = 0; i < 10; i++) {     
  5.              executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));     
  6.              int threadSize = queue.size();  
  7.              System.out.println("線程隊列大小爲-->"+threadSize);  
  8.              if (threadSize==4){  
  9.                  queue.put(new Runnable() {  
  10.                      @Override  
  11.                      public void run(){  
  12.                          System.out.println("我是新線程,看看能不能搭個車加進去!");  
  13.                      }  
  14.                  });  
  15.              }  
  16.          }     
  17.          executor.shutdown();    
  18.      }  
public static void main(String[] args) throws InterruptedException { 
         BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定爲4的線程隊列
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
         for (int i = 0; i < 10; i++) {   
             executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));   
             int threadSize = queue.size();
             System.out.println("線程隊列大小爲-->"+threadSize);
             if (threadSize==4){
                 queue.put(new Runnable() {
                     @Override
                     public void run(){
                         System.out.println("我是新線程,看看能不能搭個車加進去!");
                     }
                 });
             }
         }   
         executor.shutdown();  
     }
結果以下:

線程隊列大小爲-->0
線程隊列大小爲-->0
線程隊列大小爲-->1
線程隊列大小爲-->2
線程隊列大小爲-->3
線程隊列大小爲-->4
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
線程隊列大小爲-->3
線程隊列大小爲-->4
線程名稱:pool-1-thread-3
線程名稱:pool-1-thread-2
線程隊列大小爲-->4
線程名稱:pool-1-thread-1
線程隊列大小爲-->4
我是新線程,看看能不能搭個車加進去!
線程名稱:pool-1-thread-3
線程名稱:pool-1-thread-4
我是新線程,看看能不能搭個車加進去!
線程名稱:pool-1-thread-3
我是新線程,看看能不能搭個車加進去!
我是新線程,看看能不能搭個車加進去!

很明顯,嘗試了四次才加進去,前面三次嘗試添加,但因爲線程sleep(3000),因此沒有執行完,線程隊列一直處於滿的狀態,直到某個線程執行完,隊列有空位,新線程才加進去,沒空位以前一直阻塞(即等待),我能加進去爲止。


那麼線程池的排除策略是什麼樣呢,通常按以下規律執行:

A.  若是運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。
B.  若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
C.  若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種狀況下,任務將被拒絕。


總結:

1. 線程池可當即運行的最大線程數 即maximumPoolSize 參數。

2. 線程池能包含的最大線程數 = 可當即運行的最大線程數 + 線程隊列大小 (一部分當即運行,一部分裝隊列裏等待)

3. 核心線程數可理解爲建議值,即建議使用的線程數,或者依據CPU核數

4. add,offer,put三種添加線程到隊列的方法只在隊列滿的時候有區別,add爲拋異常,offer返回boolean值,put直到添加成功爲止。

5.同理remove,poll, take三種移除隊列中線程的方法只在隊列爲空的時候有區別, remove爲拋異常,poll爲返回boolean值, take等待直到有線程能夠被移除。

看看下面這張圖就清楚了:


記在這裏作爲學習的過程,之後偶爾有空翻起來容易。

相關文章
相關標籤/搜索