前面的文章:多線程爬坑之路-學習多線程須要來了解哪些東西?(concurrent併發包的數據結構和線程池,Locks鎖,Atomic原子類)html
多線程爬坑之路-Thread和Runable源碼解析java
多線程爬坑之路-Thread和Runable源碼解析之基本方法的運用實例es6
線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。線程池線程都是後臺線程。每一個線程都使用默認的堆棧大小,以默認的優先級運行,並處於多線程單元中。若是某個線程在託管代碼中空閒(如正在等待某個事件),則線程池將插入另外一個輔助線程來使全部處理器保持繁忙。若是全部線程池線程都始終保持繁忙,但隊列中包含掛起的工做,則線程池將在一段時間後建立另外一個輔助線程但線程的數目永遠不會超過最大值。超過最大值的線程能夠排隊,但他們要等到其餘線程完成後才啓動。web
線程池是用來解決多任務併發,頻繁建立線程和銷燬線程致使資源的浪費和效率低下的一種技術。相似於數據庫鏈接池,這種池化技術用於將部分資源重複利用,或者說是用已有的資源對多個任務進行服務。線程池就是建立固定的線程數量,服務於併發的任務,避免每個任務,系統都進行線程的建立和銷燬,增長系統的負擔,線程數量過多時會形成內存溢出。數據庫
頻繁的任務建立,快速的任務響應,同等的任務優先級。若是須要很長時間纔有一個任務須要執行,那樣建立線程池就是浪費資源,若是執行一個任務須要幾個小時,那這個任務在線程池中執行一次就會阻塞大量的線程。任務的優先級在線程池沒有做用,由於線程池的任務執行順序是隊列。若是咱們須要執行的任務有輕重緩急,那麼不建議使用線程池。數據結構
線程池的類圖中咱們能夠獲得最終的兩個實現類,ThreadPoolExeutor和ScheduleThreadPoolExeutor,這兩個類均可以建立一個線程池。可是一般咱們使用這兩個類的封裝類Exeutors來建立一個線程池。多線程
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue) { 6 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 7 Executors.defaultThreadFactory(), defaultHandler); 8 }
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, //調用的是super()方法來構造線程池。 new DelayedWorkQueue()); }
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); }
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); }
Exeutors的方法摘要列表:(省略了部分)完整的類你們能夠看源碼,API:http://www.javaweb.cc/help/JavaAPI1.6/java/util/concurrent/Executors.html併發
方法摘要 | ||
---|---|---|
static Callable<Object> |
callable(PrivilegedAction<?> action) 返回 Callable 對象,調用它時可運行給定特權的操做並返回其結果。 |
|
static Callable<Object> |
callable(PrivilegedExceptionAction<?> action) 返回 Callable 對象,調用它時可運行給定特權的異常操做並返回其結果。 |
|
static Callable<Object> |
callable(Runnable task) 返回 Callable 對象,調用它時可運行給定的任務並返回 null。 |
|
static
|
callable(Runnable task, T result) 返回 Callable 對象,調用它時可運行給定的任務並返回給定的結果。 |
|
static ThreadFactory |
defaultThreadFactory() 返回用於建立新線程的默認線程工廠。 |
|
static ExecutorService |
newCachedThreadPool() 建立一個可根據須要建立新線程的線程池,可是在之前構造的線程可用時將重用它們。 |
|
static ExecutorService |
newCachedThreadPool(ThreadFactory threadFactory) 建立一個可根據須要建立新線程的線程池,可是在之前構造的線程可用時將重用它們,並在須要時使用提供的 ThreadFactory 建立新線程。 |
|
static ExecutorService |
newFixedThreadPool(int nThreads) 建立一個可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程。 |
|
static ExecutorService |
newFixedThreadPool(int nThreads, ThreadFactory threadFactory) 建立一個可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程,在須要時使用提供的 ThreadFactory 建立新線程。 |
|
static ScheduledExecutorService |
newScheduledThreadPool(int corePoolSize) 建立一個線程池,它可安排在給定延遲後運行命令或者按期地執行。 |
|
static ScheduledExecutorService |
newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) 建立一個線程池,它可安排在給定延遲後運行命令或者按期地執行。 |
|
static ExecutorService |
newSingleThreadExecutor() 建立一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。 |
|
static ExecutorService |
newSingleThreadExecutor(ThreadFactory threadFactory) 建立一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程,並在須要時使用提供的 ThreadFactory 建立新線程。 |
|
static ScheduledExecutorService |
newSingleThreadScheduledExecutor() 建立一個單線程執行程序,它可安排在給定延遲後運行命令或者按期地執行。 |
|
static ScheduledExecutorService |
newSingleThreadScheduledExecutor(ThreadFactory threadFactory) 建立一個單線程執行程序,它可安排在給定延遲後運行命令或者按期地執行。 |
|
static
|
privilegedCallable(Callable<T> callable) 返回 Callable 對象,調用它時可在當前的訪問控制上下文中執行給定的 callable 對象。 |
|
static
|
privilegedCallableUsingCurrentClassLoader(Callable<T> callable) 返回 Callable 對象,調用它時可在當前的訪問控制上下文中,使用當前上下文類加載器做爲上下文類加載器來執行給定的 callable 對象。 |
|
static ThreadFactory |
privilegedThreadFactory() 返回用於建立新線程的線程工廠,這些新線程與當前線程具備相同的權限。 |
|
static ExecutorService |
unconfigurableExecutorService(ExecutorService executor) 返回一個將全部已定義的 ExecutorService 方法委託給指定執行程序的對象,可是使用強制轉換可能沒法訪問其餘方法。 |
|
static ScheduledExecutorService |
unconfigurableScheduledExecutorService(ScheduledExecutorService executor) 返回一個將全部已定義的 ExecutorService 方法委託給指定執行程序的對象,可是使用強制轉換可能沒法訪問其餘方法。 |
列表中紅色部分就是一些經常使用的建立線程池的方法,這些方法的類型分爲兩類scheduleExecutorService和ExecutorService,這是兩個接口,async
scheduleExecutorService類型的線程池分兩類:ide
1.newScheduleThreadPool:是有延遲執行或者按期執行的任務的線程池
2.newSingleThreadScheduleExecutor:是有延遲執行或者按期執行的任務的單線程線程池
ExecutorService類型的線程池分三類:
1.newCacheThreadPool:可重用,可擴展(按需建立)
2.newFixedThreadPool:可重用,固定數量,共享的無界隊列
3.newSingleThreadExeutor:單線程,無界隊列
1 /** 2 * Creates a thread pool that creates new threads as needed, but 3 * will reuse previously constructed threads when they are 4 * available. These pools will typically improve the performance 5 * of programs that execute many short-lived asynchronous tasks. 6 * Calls to {@code execute} will reuse previously constructed 7 * threads if available. If no existing thread is available, a new 8 * thread will be created and added to the pool. Threads that have 9 * not been used for sixty seconds are terminated and removed from 10 * the cache. Thus, a pool that remains idle for long enough will 11 * not consume any resources. Note that pools with similar 12 * properties but different details (for example, timeout parameters) 13 * may be created using {@link ThreadPoolExecutor} constructors. 14 * 15 * @return the newly created thread pool 16 */ 17 public static ExecutorService newCachedThreadPool() { 18 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 19 60L, TimeUnit.SECONDS, 20 new SynchronousQueue<Runnable>()); 21 }
1 /** 2 * Creates a thread pool that creates new threads as needed, but 3 * will reuse previously constructed threads when they are 4 * available, and uses the provided 5 * ThreadFactory to create new threads when needed. 6 * @param threadFactory the factory to use when creating new threads 7 * @return the newly created thread pool 8 * @throws NullPointerException if threadFactory is null 9 */ 10 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { 11 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 12 60L, TimeUnit.SECONDS, 13 new SynchronousQueue<Runnable>(), 14 threadFactory); 15 }
其餘的類我就不一一列出源碼了,給出下面的表格,能夠一目瞭然的看到他們之間的建立差別。(其中忽略掉線程工廠Threadfactory和Hanler(對於沒法經過一個線程池執行任務的處理程序))由於他們沒法對線程池的差別形成影響。
Executors |
||||||
靜態方法 |
調用方法(new) |
調用方法Executor的參數 |
||||
corePoolSize |
maximumPoolSize |
keepAliveTime |
TimeUnit |
workQueue |
||
newCacheThreadPool |
ThreadPoolExecutor |
0 |
Integer.MAX_VALUE |
60s |
SECONDS(秒) |
SynchronousQueue |
newFixedThreadPool |
ThreadPoolExecutor |
n(參數) |
n(參數) |
0s |
MILLISECONDS |
LinkedBlockingQueue |
newSingleThreadExecutor |
ThreadPoolExecutor |
1 |
1 |
0s |
MILLISECONDS |
LinkedBlockingQueue |
newSingleThreadScheduledExecutor |
ScheduledThreadPoolExecutor |
1 |
Integer.MAX_VALUE |
0s |
NANOSECONDS |
DelayedWorkQueue |
newScheduledThreadPool |
ScheduledThreadPoolExecutor |
n(參數) |
Integer.MAX_VALUE |
0s |
NANOSECONDS |
DelayedWorkQueue |
Integer.MAX_VALUE=int型的最大值,2^32-1=2147483647
corePoolSize:保證了最少有多少個存活的線程。若是咱們的任務很穩定,一直可以保證最少x個任務須要被執行,那咱們能夠將這個值設置爲x,若是咱們的任務有時多有時少,有時候沒有,咱們能夠設置爲0,讓它自動根據須要建立線程,這就是newCacheThreadPool的方法,由於不肯定接下來有沒有任務,因此這些線程使用完以後保持60秒的活時間,若是有任務則重用這些線程,沒任務則60秒後線程銷燬。若是咱們的任務中,始終有一個任務須要執行,可是有時會有更多的線程,那咱們能夠選擇newSingleThreadScheduledExecutor,他保證了corePoolSize=1,始終有一個線程等待任務。最大值爲Integer.MAX_VALUE。咱們能夠根據本身的場景須要選擇合適的線程池。
核心方法從Executors轉移到了ThreadPoolExecutor類和ScheduledThreadPoolExecutor類上。下面深刻一下構建一個線程池須要的步驟和準備。理解了他是如何建立一個線程池以後咱們能夠手動寫本身的線程池。
總結一下類裏面就是三個部分:
1.構造方法:第一個構造方法部分咱們已經研究過了
2.屬性操做:第二個部分屬性操做,就是獲取咱們構造的線程池的一些屬性,基本就是構造方法中的參數。
3.線程池管理:主要仍是內部的調用,去對線程的建立和管理,對任務的執行和監控。
線程池一旦建立好了只須要往裏面添加任務執行就行了。而咱們須要關心的就是建立的線程池性能是否足夠,是否能夠優化。
示例:建立線程池
1 import java.util.concurrent.ThreadFactory; 2 3 public class ThreadFactorImpl implements ThreadFactory{//這裏本身實現了一個線程工廠,只是爲了下面建立線程池作準備,你們能夠忽略掉默認的工廠沒這麼簡單。後面給源碼 4 public Thread newThread(Runnable r) { 5 return new Thread(r); 6 } 7 }
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 import java.util.concurrent.ScheduledThreadPoolExecutor; 4 import java.util.concurrent.SynchronousQueue; 5 import java.util.concurrent.ThreadPoolExecutor; 6 import java.util.concurrent.TimeUnit; 7 8 public class TestThreadPool { 9 private int num; 10 11 public int test() { 12 return num++; 13 } 14 public static void main(String[] args) { 15 ExecutorService es1 = Executors.newCachedThreadPool(); 16 ExecutorService es2 = Executors.newCachedThreadPool(new ThreadFactorImpl()); 17 ExecutorService es3 = Executors.newFixedThreadPool(10); 18 ExecutorService es4 = Executors.newFixedThreadPool(10, new ThreadFactorImpl()); 19 ExecutorService es5 = Executors.newSingleThreadExecutor(); 20 ExecutorService es6 = Executors.newSingleThreadExecutor(new ThreadFactorImpl()); 21 ExecutorService es7 = Executors.newScheduledThreadPool(10); 22 ExecutorService es8 = Executors.newScheduledThreadPool(10, new ThreadFactorImpl()); 23 ExecutorService es9 = new ScheduledThreadPoolExecutor(10); 24 ExecutorService es10 = new ThreadPoolExecutor(1, 10, 20, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); 25 } 26 }
當Executors的構造方法知足不了需求的時候,就須要直接經過原始類ThreadPoolExecutor類和ScheduledThreadPoolExecutor類來傳入合適的參數。
defaultThreadFactory默認的線程工廠:
1 static class DefaultThreadFactory implements ThreadFactory { 2 private static final AtomicInteger poolNumber = new AtomicInteger(1); 3 private final ThreadGroup group; 4 private final AtomicInteger threadNumber = new AtomicInteger(1); 5 private final String namePrefix; 6 7 DefaultThreadFactory() { 8 SecurityManager s = System.getSecurityManager(); 9 group = (s != null) ? s.getThreadGroup() : 10 Thread.currentThread().getThreadGroup(); 11 namePrefix = "pool-" + 12 poolNumber.getAndIncrement() + 13 "-thread-"; 14 } 15 16 public Thread newThread(Runnable r) { 17 Thread t = new Thread(group, r, 18 namePrefix + threadNumber.getAndIncrement(), 19 0); 20 if (t.isDaemon()) 21 t.setDaemon(false); 22 if (t.getPriority() != Thread.NORM_PRIORITY) 23 t.setPriority(Thread.NORM_PRIORITY); 24 return t; 25 } 26 }
建立了線程池,有了線程工廠,當任務量>線程數的時候,工廠就會建立線程去執行任務。
執行任務只須要調用execute()方法就夠了,下面咱們建立了四個任務分別給各自的對象num數據+1。而後把任務加入線程池,execute方法其實是加入工做隊列的過程。
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 4 public class TestThreadPool implements Runnable{ 5 private int num; 6 public TestThreadPool(int num){ 7 this.num = num; 8 } 9 public int getNum() { 10 return num; 11 } 12 public void setNum(int num) { 13 this.num = num; 14 } 15 public void run() { 16 setNum(++this.num); 17 } 18 public static void main(String[] args) { 19 ExecutorService es1 = Executors.newCachedThreadPool(); 20 //任務t1,t2,t3,t4 21 TestThreadPool t1 = new TestThreadPool(1); 22 TestThreadPool t2 = new TestThreadPool(2); 23 TestThreadPool t3 = new TestThreadPool(3); 24 TestThreadPool t4 = new TestThreadPool(4); 25 es1.execute(t1); 26 es1.execute(t2); 27 es1.execute(t3); 28 es1.execute(t4); 29 try { 30 Thread.sleep(1000); 31 } catch (InterruptedException e) { 32 e.printStackTrace(); 33 } 34 System.out.println(t1.getNum()); 35 System.out.println(t2.getNum()); 36 System.out.println(t3.getNum()); 37 System.out.println(t4.getNum()); 38 } 39 }
執行結果以下:跟咱們預想的結果相同。
2 3 4 5
因爲執行任務的線程是內部工廠建立的因此咱們沒法觀測到線程的狀態和屬性,經過Execute的源碼來看一下它的執行過程:(不一樣的建立方法對應了不一樣的線程池類,同時也對應了不一樣的execute方法)下面是ThreadPoolExecutor類的execute方法源碼
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * Proceed in 3 steps: 6 * 7 * 1. If fewer than corePoolSize threads are running, try to 8 * start a new thread with the given command as its first 9 * task. The call to addWorker atomically checks runState and 10 * workerCount, and so prevents false alarms that would add 11 * threads when it shouldn't, by returning false. 12 * 13 * 2. If a task can be successfully queued, then we still need 14 * to double-check whether we should have added a thread 15 * (because existing ones died since last checking) or that 16 * the pool shut down since entry into this method. So we 17 * recheck state and if necessary roll back the enqueuing if 18 * stopped, or start a new thread if there are none. 19 * 20 * 3. If we cannot queue task, then we try to add a new 21 * thread. If it fails, we know we are shut down or saturated 22 * and so reject the task. 23 */ 24 int c = ctl.get(); 25 if (workerCountOf(c) < corePoolSize) { 26 if (addWorker(command, true)) 27 return; 28 c = ctl.get(); 29 } 30 if (isRunning(c) && workQueue.offer(command)) { 31 int recheck = ctl.get(); 32 if (! isRunning(recheck) && remove(command)) 33 reject(command); 34 else if (workerCountOf(recheck) == 0) 35 addWorker(null, false); 36 } 37 else if (!addWorker(command, false)) 38 reject(command); 39 }
英語好的直接看源碼的註釋部分,註釋中說:
簡單的來講:就是檢查線程池的狀態,看是否可以把任務加入到隊列或者是否須要用一個線程去執行它,最終就是,加入隊列等待,或者加入隊列執行,或者說拒絕(線程池狀態問題。。)
再往下深刻,就要到虛擬機的實現機制上的一些東西了包括同步機制,鎖,鎖優化問題,等等。至少到目前爲止,咱們知道了線程池的使用和它的重點在哪裏,怎樣根據需求去建立一個合適的線程池。
後面的話就要進入併發問題了,到了線程的難點部分。