[Java多線程]-線程池的基本使用和部分源碼解析(建立,執行原理)

前面的文章:多線程爬坑之路-學習多線程須要來了解哪些東西?(concurrent併發包的數據結構和線程池,Locks鎖,Atomic原子類)html

      多線程爬坑之路-Thread和Runable源碼解析java

      多線程爬坑之路-Thread和Runable源碼解析之基本方法的運用實例es6

一.線程池ThreadPool的基本定義?

  線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。線程池線程都是後臺線程。每一個線程都使用默認的堆棧大小,以默認的優先級運行,並處於多線程單元中。若是某個線程在託管代碼中空閒(如正在等待某個事件),則線程池將插入另外一個輔助線程來使全部處理器保持繁忙。若是全部線程池線程都始終保持繁忙,但隊列中包含掛起的工做,則線程池將在一段時間後建立另外一個輔助線程但線程的數目永遠不會超過最大值。超過最大值的線程能夠排隊,但他們要等到其餘線程完成後才啓動。web

二.線程池的背景及做用?

  線程池是用來解決多任務併發,頻繁建立線程和銷燬線程致使資源的浪費和效率低下的一種技術。相似於數據庫鏈接池,這種池化技術用於將部分資源重複利用,或者說是用已有的資源對多個任務進行服務。線程池就是建立固定的線程數量,服務於併發的任務,避免每個任務,系統都進行線程的建立和銷燬,增長系統的負擔,線程數量過多時會形成內存溢出。數據庫

三.線程池的運用場景:

  頻繁的任務建立,快速的任務響應,同等的任務優先級。若是須要很長時間纔有一個任務須要執行,那樣建立線程池就是浪費資源,若是執行一個任務須要幾個小時,那這個任務在線程池中執行一次就會阻塞大量的線程。任務的優先級在線程池沒有做用,由於線程池的任務執行順序是隊列。若是咱們須要執行的任務有輕重緩急,那麼不建議使用線程池。數據結構

四.線程池的基本類圖

五.線程池的建立過程:

線程池的類圖中咱們能夠獲得最終的兩個實現類,ThreadPoolExeutorScheduleThreadPoolExeutor,這兩個類均可以建立一個線程池。可是一般咱們使用這兩個類的封裝類Exeutors來建立一個線程池。多線程

一、TheadPoolExeutor:運用這個類咱們須要傳入四個參數

  • corePoolsize:核心線程數量
  • maximumPoolSize:最大線程數量
  • keepAliveTime:最大存活時間(當一個線程沒有任務執行時最大的存活時間)
  • until:用於格式化時間,指定時間類型。TimeUnit:一個枚舉類,詳情請看:http://www.importnew.com/7219.html
  • workQueue:任務隊列的接口聲明(BlockingQueue<Runnable>是一個繼承Queue的接口)
1 public ThreadPoolExecutor(int corePoolSize,
2                               int maximumPoolSize,
3                               long keepAliveTime,
4                               TimeUnit unit,
5                               BlockingQueue<Runnable> workQueue) {
6         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
7              Executors.defaultThreadFactory(), defaultHandler);
8     }

二、SheduleThreadPoolExeutor:這個類的四個構造函數能夠根據不一樣的參數構造不一樣用途的線程池。

 public ScheduledThreadPoolExecutor(int corePoolSize) {
         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, //調用的是super()方法來構造線程池。
               new DelayedWorkQueue());
     }
   public ScheduledThreadPoolExecutor(int corePoolSize,
                                        RejectedExecutionHandler handler) {
         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
               new DelayedWorkQueue(), handler);
     }
  public ScheduledThreadPoolExecutor(int corePoolSize,
                                        ThreadFactory threadFactory) {
         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
               new DelayedWorkQueue(), threadFactory);
     }
 public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

三、Exeutors:這個類主要是對以上兩個類作了包裝,經過這個類各類靜態方法咱們能夠建立知足各類需求的線程池。Exeutors類經過傳遞不一樣的參數以達到建立不一樣類型線程池的目的。

Exeutors的方法摘要列表:(省略了部分)完整的類你們能夠看源碼,API:http://www.javaweb.cc/help/JavaAPI1.6/java/util/concurrent/Executors.html併發

方法摘要
static Callable<Object> callable(PrivilegedAction<?> action) 
          返回 Callable 對象,調用它時可運行給定特權的操做並返回其結果。
static Callable<Object> callable(PrivilegedExceptionAction<?> action) 
          返回 Callable 對象,調用它時可運行給定特權的異常操做並返回其結果。
static Callable<Object> callable(Runnable task) 
          返回 Callable 對象,調用它時可運行給定的任務並返回 null
static
<T> Callable<T>
callable(Runnable task, T result) 
          返回 Callable 對象,調用它時可運行給定的任務並返回給定的結果。
static ThreadFactory defaultThreadFactory() 
          返回用於建立新線程的默認線程工廠。
static ExecutorService newCachedThreadPool() 
          建立一個可根據須要建立新線程的線程池,可是在之前構造的線程可用時將重用它們。
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) 
          建立一個可根據須要建立新線程的線程池,可是在之前構造的線程可用時將重用它們,並在須要時使用提供的 ThreadFactory 建立新線程。
static ExecutorService newFixedThreadPool(int nThreads) 
          建立一個可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程。
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) 
          建立一個可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程,在須要時使用提供的 ThreadFactory 建立新線程。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 
          建立一個線程池,它可安排在給定延遲後運行命令或者按期地執行。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) 
          建立一個線程池,它可安排在給定延遲後運行命令或者按期地執行。
static ExecutorService newSingleThreadExecutor() 
          建立一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。
static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) 
          建立一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程,並在須要時使用提供的 ThreadFactory 建立新線程。
static ScheduledExecutorService newSingleThreadScheduledExecutor() 
          建立一個單線程執行程序,它可安排在給定延遲後運行命令或者按期地執行。
static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) 
          建立一個單線程執行程序,它可安排在給定延遲後運行命令或者按期地執行。
static
<T> Callable<T>
privilegedCallable(Callable<T> callable) 
          返回 Callable 對象,調用它時可在當前的訪問控制上下文中執行給定的 callable 對象。
static
<T> Callable<T>
privilegedCallableUsingCurrentClassLoader(Callable<T> callable) 
          返回 Callable 對象,調用它時可在當前的訪問控制上下文中,使用當前上下文類加載器做爲上下文類加載器來執行給定的 callable 對象。
static ThreadFactory privilegedThreadFactory() 
          返回用於建立新線程的線程工廠,這些新線程與當前線程具備相同的權限。
static ExecutorService unconfigurableExecutorService(ExecutorService executor) 
          返回一個將全部已定義的 ExecutorService 方法委託給指定執行程序的對象,可是使用強制轉換可能沒法訪問其餘方法。
static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) 
          返回一個將全部已定義的 ExecutorService 方法委託給指定執行程序的對象,可是使用強制轉換可能沒法訪問其餘方法。

列表中紅色部分就是一些經常使用的建立線程池的方法,這些方法的類型分爲兩類scheduleExecutorService和ExecutorService,這是兩個接口,async

scheduleExecutorService類型的線程池分兩類:ide

  1.newScheduleThreadPool:是有延遲執行或者按期執行的任務的線程池

  2.newSingleThreadScheduleExecutor:是有延遲執行或者按期執行的任務的單線程線程池

ExecutorService類型的線程池分三類:

  1.newCacheThreadPool:可重用,可擴展(按需建立)

  2.newFixedThreadPool:可重用,固定數量,共享的無界隊列

  3.newSingleThreadExeutor:單線程,無界隊列                      

Exeutors可經過調用上面五種封裝方法建立不一樣應用場景的線程池,這些方法的具體實現源碼以下:

一、newCachedThreadPool源碼:

  • 調用ThreadPoolExecutor建立線程:
    • 第一個參數爲0表示他的核心線程數是0,可根據任務須要建立線程,每個使用完得線程具備60秒的存活時間,
    • 第二個參數最大線程數:Integer.MAX_VALUE,int類型的最大值。
    • 第三個參數最大存活時間:60秒,第四個參數:時間的單位是SECONDS,秒。
    • 最後一個表示任務隊列:SynchronousQueue(這個類的實現後面再看),這裏沒有指定線程工廠在ThreadPoolExecutor類中會使用默認的工廠。
 1 /**
 2      * Creates a thread pool that creates new threads as needed, but
 3      * will reuse previously constructed threads when they are
 4      * available.  These pools will typically improve the performance
 5      * of programs that execute many short-lived asynchronous tasks.
 6      * Calls to {@code execute} will reuse previously constructed
 7      * threads if available. If no existing thread is available, a new
 8      * thread will be created and added to the pool. Threads that have
 9      * not been used for sixty seconds are terminated and removed from
10      * the cache. Thus, a pool that remains idle for long enough will
11      * not consume any resources. Note that pools with similar
12      * properties but different details (for example, timeout parameters)
13      * may be created using {@link ThreadPoolExecutor} constructors.
14      *
15      * @return the newly created thread pool
16      */
17     public static ExecutorService newCachedThreadPool() {
18         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
19                                       60L, TimeUnit.SECONDS,
20                                       new SynchronousQueue<Runnable>());
21     }
  • 有線程工廠做爲參數:與上面惟一不一樣的是多了一個參數,threadFactory會替代掉默認的工廠來建立線程。
 1  /**
 2      * Creates a thread pool that creates new threads as needed, but
 3      * will reuse previously constructed threads when they are
 4      * available, and uses the provided
 5      * ThreadFactory to create new threads when needed.
 6      * @param threadFactory the factory to use when creating new threads
 7      * @return the newly created thread pool
 8      * @throws NullPointerException if threadFactory is null
 9      */
10     public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
11         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
12                                       60L, TimeUnit.SECONDS,
13                                       new SynchronousQueue<Runnable>(),
14                                       threadFactory);
15     }

其餘的類我就不一一列出源碼了,給出下面的表格,能夠一目瞭然的看到他們之間的建立差別。(其中忽略掉線程工廠Threadfactory和Hanler(對於沒法經過一個線程池執行任務的處理程序))由於他們沒法對線程池的差別形成影響。

靜態方法中的參數主要有三個:ThreadFactor,corePoolsize,Handler.其中corePoolSize最爲重要,設置的數量要根據具體的須要而定。過少會形成線程阻塞性能降低,過多會形成資源浪費。

Executors

     靜態方法       

          調用方法(new)      

                                                                調用方法Executor的參數

corePoolSize

 maximumPoolSize

keepAliveTime

 TimeUnit   

    workQueue

newCacheThreadPool

ThreadPoolExecutor

0

Integer.MAX_VALUE 

60s

 SECONDS(秒)      

SynchronousQueue

newFixedThreadPool

ThreadPoolExecutor

n(參數)

n(參數)

0s

 MILLISECONDS

LinkedBlockingQueue

newSingleThreadExecutor

ThreadPoolExecutor

1

1

0s

MILLISECONDS

LinkedBlockingQueue

newSingleThreadScheduledExecutor

ScheduledThreadPoolExecutor

1

Integer.MAX_VALUE 

0s

NANOSECONDS

DelayedWorkQueue

newScheduledThreadPool

ScheduledThreadPoolExecutor

n(參數)

Integer.MAX_VALUE 

0s

NANOSECONDS

DelayedWorkQueue

Integer.MAX_VALUE=int型的最大值,2^32-1=2147483647

四、如何選擇建立線程池的類型?

  corePoolSize:保證了最少有多少個存活的線程。若是咱們的任務很穩定,一直可以保證最少x個任務須要被執行,那咱們能夠將這個值設置爲x,若是咱們的任務有時多有時少,有時候沒有,咱們能夠設置爲0,讓它自動根據須要建立線程,這就是newCacheThreadPool的方法,由於不肯定接下來有沒有任務,因此這些線程使用完以後保持60秒的活時間,若是有任務則重用這些線程,沒任務則60秒後線程銷燬。若是咱們的任務中,始終有一個任務須要執行,可是有時會有更多的線程,那咱們能夠選擇newSingleThreadScheduledExecutor,他保證了corePoolSize=1,始終有一個線程等待任務。最大值爲Integer.MAX_VALUE。咱們能夠根據本身的場景須要選擇合適的線程池。

核心方法從Executors轉移到了ThreadPoolExecutor類和ScheduledThreadPoolExecutor類上。下面深刻一下構建一個線程池須要的步驟和準備。理解了他是如何建立一個線程池以後咱們能夠手動寫本身的線程池。

六、ThreadPoolExecutor類視圖:

總結一下類裏面就是三個部分:

  1.構造方法:第一個構造方法部分咱們已經研究過了

  2.屬性操做:第二個部分屬性操做,就是獲取咱們構造的線程池的一些屬性,基本就是構造方法中的參數。

  3.線程池管理:主要仍是內部的調用,去對線程的建立和管理,對任務的執行和監控。

線程池一旦建立好了只須要往裏面添加任務執行就行了。而咱們須要關心的就是建立的線程池性能是否足夠,是否能夠優化。

示例:建立線程池

1 import java.util.concurrent.ThreadFactory;
2 
3 public class ThreadFactorImpl implements ThreadFactory{//這裏本身實現了一個線程工廠,只是爲了下面建立線程池作準備,你們能夠忽略掉默認的工廠沒這麼簡單。後面給源碼
4     public Thread newThread(Runnable r) {
5         return new Thread(r);
6     }
7 }
 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 import java.util.concurrent.ScheduledThreadPoolExecutor;
 4 import java.util.concurrent.SynchronousQueue;
 5 import java.util.concurrent.ThreadPoolExecutor;
 6 import java.util.concurrent.TimeUnit;
 7 
 8 public class TestThreadPool {
 9     private int num;
10 
11     public int test() {
12         return num++;
13     }
14     public static void main(String[] args) {
15         ExecutorService es1 = Executors.newCachedThreadPool();
16         ExecutorService es2 = Executors.newCachedThreadPool(new ThreadFactorImpl());
17         ExecutorService es3 = Executors.newFixedThreadPool(10);
18         ExecutorService es4 = Executors.newFixedThreadPool(10, new ThreadFactorImpl());
19         ExecutorService es5 = Executors.newSingleThreadExecutor();
20         ExecutorService es6 = Executors.newSingleThreadExecutor(new ThreadFactorImpl());
21         ExecutorService es7 = Executors.newScheduledThreadPool(10);
22         ExecutorService es8 = Executors.newScheduledThreadPool(10, new ThreadFactorImpl());
23         ExecutorService es9 = new ScheduledThreadPoolExecutor(10);
24         ExecutorService es10 = new ThreadPoolExecutor(1, 10, 20, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
25     }
26 }

當Executors的構造方法知足不了需求的時候,就須要直接經過原始類ThreadPoolExecutor類和ScheduledThreadPoolExecutor類來傳入合適的參數。

defaultThreadFactory默認的線程工廠:

 1 static class DefaultThreadFactory implements ThreadFactory {
 2         private static final AtomicInteger poolNumber = new AtomicInteger(1);
 3         private final ThreadGroup group;
 4         private final AtomicInteger threadNumber = new AtomicInteger(1);
 5         private final String namePrefix;
 6 
 7         DefaultThreadFactory() {
 8             SecurityManager s = System.getSecurityManager();
 9             group = (s != null) ? s.getThreadGroup() :
10                                   Thread.currentThread().getThreadGroup();
11             namePrefix = "pool-" +
12                           poolNumber.getAndIncrement() +
13                          "-thread-";
14         }
15 
16         public Thread newThread(Runnable r) {
17             Thread t = new Thread(group, r,
18                                   namePrefix + threadNumber.getAndIncrement(),
19                                   0);
20             if (t.isDaemon())
21                 t.setDaemon(false);
22             if (t.getPriority() != Thread.NORM_PRIORITY)
23                 t.setPriority(Thread.NORM_PRIORITY);
24             return t;
25         }
26     }

建立了線程池,有了線程工廠,當任務量>線程數的時候,工廠就會建立線程去執行任務。

6、線程池是如何管理和執行任務的?

執行任務只須要調用execute()方法就夠了,下面咱們建立了四個任務分別給各自的對象num數據+1。而後把任務加入線程池,execute方法其實是加入工做隊列的過程。

示例:execute執行任務

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 
 4 public class TestThreadPool implements Runnable{
 5     private int num;
 6     public TestThreadPool(int num){
 7         this.num = num;
 8     }
 9     public int getNum() {
10         return num;
11     }
12     public void setNum(int num) {
13         this.num = num;
14     }
15     public void run() {
16         setNum(++this.num);
17     }
18     public static void main(String[] args) {
19         ExecutorService es1 = Executors.newCachedThreadPool();
20         //任務t1,t2,t3,t4
21         TestThreadPool t1 = new TestThreadPool(1);
22         TestThreadPool t2 = new TestThreadPool(2);
23         TestThreadPool t3 = new TestThreadPool(3);
24         TestThreadPool t4 = new TestThreadPool(4);
25         es1.execute(t1);
26         es1.execute(t2);
27         es1.execute(t3);
28         es1.execute(t4);
29         try {
30             Thread.sleep(1000);
31         } catch (InterruptedException e) {
32             e.printStackTrace();
33         }
34         System.out.println(t1.getNum());
35         System.out.println(t2.getNum());
36         System.out.println(t3.getNum());
37         System.out.println(t4.getNum());
38     }
39 }

執行結果以下:跟咱們預想的結果相同。

2
3
4
5

execute方法源碼:

因爲執行任務的線程是內部工廠建立的因此咱們沒法觀測到線程的狀態和屬性,經過Execute的源碼來看一下它的執行過程:(不一樣的建立方法對應了不一樣的線程池類,同時也對應了不一樣的execute方法)下面是ThreadPoolExecutor類的execute方法源碼

 1 public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         /*
 5          * Proceed in 3 steps:
 6          *
 7          * 1. If fewer than corePoolSize threads are running, try to
 8          * start a new thread with the given command as its first
 9          * task.  The call to addWorker atomically checks runState and
10          * workerCount, and so prevents false alarms that would add
11          * threads when it shouldn't, by returning false.
12          *
13          * 2. If a task can be successfully queued, then we still need
14          * to double-check whether we should have added a thread
15          * (because existing ones died since last checking) or that
16          * the pool shut down since entry into this method. So we
17          * recheck state and if necessary roll back the enqueuing if
18          * stopped, or start a new thread if there are none.
19          *
20          * 3. If we cannot queue task, then we try to add a new
21          * thread.  If it fails, we know we are shut down or saturated
22          * and so reject the task.
23          */
24         int c = ctl.get();
25         if (workerCountOf(c) < corePoolSize) {
26             if (addWorker(command, true))
27                 return;
28             c = ctl.get();
29         }
30         if (isRunning(c) && workQueue.offer(command)) {
31             int recheck = ctl.get();
32             if (! isRunning(recheck) && remove(command))
33                 reject(command);
34             else if (workerCountOf(recheck) == 0)
35                 addWorker(null, false);
36         }
37         else if (!addWorker(command, false))
38             reject(command);
39     }

英語好的直接看源碼的註釋部分,註釋中說:

excute的過程主要分三步:

  1. 若是當前正在運行的線程數小於corePoolSize的話,那麼嘗試使用傳入的任務(command)做爲第一個任務啓動一個新的線程執行。addWorker函數會原子性的檢查線程池的runState以及workerCount防止不該該添加的新線程被添加。若是是假警報的話,那麼addWorker函數就會返回false,表示添加新線程失敗。
  2. 若是一個Task被成功的加入隊列了,而後仍然須要從新check一次是否須要從新添加一個線程,由於有可能在上一次檢查到此次檢查之間,已經存在的線程已經死亡。或者,自從進入這個方法後,線程池已經被關閉(shut down)。因此咱們須要從新check狀態,而且在必要的時候,若是處於stopped狀態,須要從新回滾到隊列中,或者若是沒有的話,就須要從新啓動一個線程。
  3. 若是不能把task加入到隊列中,那麼就會嘗試去添加一個新的線程,若是它失敗了,就知道是已經當前線程池是處於shut down或者處於飽和狀態,那麼就執行拒絕(reject)操做。

  簡單的來講:就是檢查線程池的狀態,看是否可以把任務加入到隊列或者是否須要用一個線程去執行它,最終就是,加入隊列等待,或者加入隊列執行,或者說拒絕(線程池狀態問題。。)

  再往下深刻,就要到虛擬機的實現機制上的一些東西了包括同步機制,鎖,鎖優化問題,等等。至少到目前爲止,咱們知道了線程池的使用和它的重點在哪裏,怎樣根據需求去建立一個合適的線程池。

  後面的話就要進入併發問題了,到了線程的難點部分。

相關文章
相關標籤/搜索