java併發基礎

1、建立線程html

  1.使用Executor:java.util.concurrent包中的執行器(Execute)爲你管理Thread對象,從而簡化併發編程。java

  

package thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class LiftOff implements Runnable{ public int countDown=10;//倒計時十個數
    public static int taskNumber=0;//靜態的,多個LiftOff對象共享這一個變量
    public final int id=taskNumber++;//id是惟一的
    public void status(){ System.out.println(id+"的倒計時是:"+countDown); } @Override public void run() { while (countDown-->0){ status(); Thread.yield(); } } public static void main(String[] args) { ExecutorService exe = Executors.newCachedThreadPool(); for(int i=0;i<5;i++){ exe.execute(new LiftOff()); }
       exe.shutdown();
} }

其中,shutdown方法的調用,是防止新任務被提交給這個Executor,當前線程(這裏指main線程),將繼續運行在shutdown被調用以前提交的任務,這個程序將在Executor中的全部任務完成以後儘快退出。程序員

    1.2.FixedThreadPool(5)編程

    1.3SingleThreadExecutor數組

   2.從任務中產生返回值緩存

    

package thread; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id){ this.id=id; } @Override public String call() throws Exception { return "Result id is:"+id; } public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); ArrayList<Future<String>> result = new ArrayList<Future<String>>(); for(int i=0;i<5;i++){ result.add(executorService.submit(new TaskWithResult(i))); } for (Future<String> i:result ) { try { System.out.println(i.get()); }catch (Exception e){ e.printStackTrace(); }finally { executorService.shutdown(); } } } }

submit方法會產生Future對象,它用Callable返回結果的特色類型進行了參數化,你能夠用isDone方法查詢Future是否已經完成。當任務完成 時,它有一個結果,你能夠調用get方法獲取結果,get將被阻塞,直至數據準備就緒。併發

  3。sleep()ide

  對sleep的調用能夠拋出InterruptException異常,而且你能夠看到,它在run中被捕獲,由於異常不能跨線程傳播回main(),全部你必須在本地處理全部在任務內部產生的異常。this

2、Executor Frame Work(java.util.concurrent.*)spa

  1.分離任務的建立和執行者的建立

  2.線程重複利用(new線程代價很大)

  共享線程池概念:預先設好多個Thread,可彈性增長,屢次執行不少很小的任務,任務建立和執行解耦。程序員無需關心線程池執行任務過程。

  Executor主要類:ExecutorService,ThreadPoolExecutor,Future

  Executors.newCachedThreadPool/newFixedThreadPool  固定大小/可動態改變

  Callable具體的邏輯對象(線程類)

  Future 返回結果

前面講的不是很系統,如今從線程池的最核心類ThreadPoolExecutor講起

1、Java中的ThreadPoolExecutor

  java.util.concurrent.ThreadPoolExecutor類是線程池中最核心的類,在其中提供了四個構造方法

public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); ... }

從上面的代碼可知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,經過觀察每一個構造器的源碼實現,發現前面三個構造器都是調用第四個構造器進行初始化。

下面解釋一下各個構造器的各個參數

  corePoolSize:核心池的大小,這個參數跟後面講的線程池實現原理有很是大的關係。在建立了線程池後,默認狀況下,線程池中並無任何數據,而是等待有任務到來才建立線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreTherad()方法,即預建立線程,即在沒有任務到來以前就建立corePoolSize個線程或者一個線程。默認狀況下,在建立了線程池後,線程池中的線程數爲0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列中。

  maximunPoolSize:線程池最大線程數,這個參數也是很是重要的,它表示線程池中最多能夠建立多少個線程。

  keepAliveTime:表示線程沒有執行任務時最多會保持多久時間會終止,默認狀況下,只有當線程池中線程數大於corePoolSize時,keepAliveTime纔會起做用,直到線程池中線程數不大於corePoolSize。可是若是調用了allowCoreThreadTimeOut(boolean)方法,在線程池中線程數不大於corePoolSize時,keepAliveTime也會起做用,直到線程池中線程數爲0。

  unit:參數keepAliveTime的時間單位,有七種取值,在TimeUnit中有7中靜態屬性

TimeUnit.DAYS;               //
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒

  workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生極大影響,通常來講,這裏的阻塞隊列有如下幾種選擇:

ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;

通常使用的時LinkedBlockingQueue與SynchronousQueue,線程池的排隊策略與BlockQueue有關。

  threadFactory:線程工廠,主要用來建立線程。

  handler:表示當拒絕處理任務時的策略,有如下四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程) ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務 

ThreadPoolExecutor繼承AbstractExecutorService(抽象類),AbstractExecutorService實現了ExecutorService接口,ExecutorService接口又繼承了Executor接口。

public interface Executor { void execute(Runnable command); }

此接口中定義的方法時用來執行傳入的任務的。

而後ExecutorService接口繼承了Executor接口,並聲明瞭一些方法:submit,invokeAll,invokeAny以及shutDown等。

抽象類AbstractExecutorService實現了ExecutorService基本實現了ExecutorService中實現 的方法。

ThreadPoolExecutor幾個重要的方法:

execute() submit() shutdown() shutdownNow()

executor方法實際上時Executor中聲明的方法,經過這個方法能夠向線程池提交一個任務,交由線程池去管理。

submit()方法時ExecutorService中聲明的方法,在AbstractExecutorService中就已經有了具體實現,這個方法也是用來向線程池提交任務的,可是它和executor方法不一樣,它能返回任務執行的結果,它內部仍是用executor方法,只不過它利用了Future來獲取任務執行結果。

shutdown與shutdownNow是用來關閉線程池的。

還有不少其餘的方法:

  好比:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法,有興趣的朋友能夠自行查閱API。

2、深刻剖析線程池實現原理

在上一節咱們從宏觀上介紹了ThreadPoolExecutor,下面咱們來深刻解析一下線程池的具體實現原理,將從下面幾個方面講解:

  1.線程池狀態

  2.任務的執行

  3.線程池中的線程初始化

  4.任務緩存隊列及排隊策略

  5.任務拒絕策略

  6.線程池的關閉

  7.線程池容量的動態調整

1.線程池狀態

  在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:

volatile int runState; static final int RUNNING    = 0; static final int SHUTDOWN   = 1; static final int STOP       = 2; static final int TERMINATED = 3;

RunState表示當前線程池的狀態,它是一個volatile變量來保證各線程之間的可見性

 下面的幾個static final 變量表示RunState可能的幾個取值

  當線程池建立之後,初始時,線程池處於RUNNING狀態;

  若是調用了shutdown方法後,則線程池處於SHUTDOWN狀態,此時線程池不能接受新任務,它會等待全部任務執行完畢。

  若是調用了shutdownNow方法,則線程處於STOP狀態,此時線程池不能接受新任務,而且回去嘗試終止正在執行的任務。

  當線程池處於SHUTDOWN或STOP狀態,而且全部的工做線程已經被銷燬,任務緩存隊列已經清空或執行結束後,線程池被置爲TERMINATED狀態。

2.執行的任務

  在瞭解將任務提交給線程池到任務執行完畢整個過程以前,咱們先來看一下ThreadPoolExecutor類中其餘的一些比較重要成員變量:

  

private final BlockingQueue<Runnable> workQueue;              //任務緩存隊列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock();   //線程池的主要狀態鎖,對線程池狀態(好比線程池大小 //、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>();  //用來存放工做集
 
private volatile long  keepAliveTime;    //線程存貨時間 
private volatile boolean allowCoreThreadTimeOut;   //是否容許爲核心線程設置存活時間
private volatile int   corePoolSize;     //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int   maximumPoolSize;   //線程池最大能容忍的線程數
 
private volatile int   poolSize;       //線程池中當前的線程數
 
private volatile RejectedExecutionHandler handler; //任務拒絕策略
 
private volatile ThreadFactory threadFactory;   //線程工廠,用來建立線程
 
private int largestPoolSize;   //用來記錄線程池中曾經出現過的最大線程數
 
private long completedTaskCount;   //用來記錄已經執行完畢的任務個數

每一個變量的做用都已經標明出來了,這裏要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。

corePoolSize在不少地方被翻譯成核心池大小,其實個人理解這個就是線程池的大小。舉個簡單的例子:

  假若有一個工廠,工廠裏面有10個工人,每一個工人同時只能作一件任務。

  所以只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人作;

  當10個工人都有任務在作時,若是還來了任務,就把任務進行排隊等待;

  若是說新任務數目增加的速度遠遠大於工人作任務的速度,那麼此時工廠主管可能會想補救措施,好比從新招4個臨時工人進來;

  而後就將任務也分配給這4個臨時工人作;

  若是說着14個工人作任務的速度仍是不夠,此時工廠主管可能就要考慮再也不接收新的任務或者拋棄前面的一些任務了。

  當這14個工人當中有人空閒時,而新任務增加的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。

這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量忽然過大時的一種補救措施。

  不過爲了方便理解,在本文後面仍是將corePoolSize翻譯成核心池大小。

  largestPoolSize只是一個用來起記錄做用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關係。

下面咱們進入正題,看一下任務從提交到最終執行完畢經歷了哪些過程。

在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然經過submit也能夠提交任務,可是實際上submit方法裏面最終調用的仍是execute()方法,因此咱們只須要研究execute()方法的實現原理便可:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated
 } }

首先是判斷提交的任務command是不是null,如果null,則拋出空指針異常。

接着是這句:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

因爲是或條件運算,若是線程池中的線程數大於等於核心池數,那麼直接進入下面的if語句塊,若是線程池中線程數小於核心池數,則執行後半部分,也就是:

addIfUnderCorePoolSize(command)

若是執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,不然整個方法就直接執行完畢了。

  若是執行完addIfUnderCorePoolSize這個方法返回false,而後接着判斷:

if (runState == RUNNING && workQueue.offer(command))

若是當前線程池處於RUNNING狀態,則將任務放入緩存隊列,若是當前不處於RUNNING或者放入緩存隊列失敗,則執行:

addIfUnderMaximumPoolSize(command)

若是執行addIfUnderMaximumPoolSize失敗,則執行reject方法進行拒絕任務處理。

回到前面:

if (runState == RUNNING && workQueue.offer(command))

這句的執行,若是成功,則進行判斷

if (runState != RUNNING || poolSize == 0)

這句判斷是爲了防止在將此任務添加進任務緩存隊列的同時其餘線程忽然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。若是是這樣就執行:

ensureQueuedTaskHandled(command)

進行應急處理,從名字能夠看出是保證 添加到任務緩存隊列中的任務獲得處理。

咱們接着看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask);        //建立線程去執行firstTask任務 
        } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }

這個是addIfUnderCorePoolSize方法的具體實現,從名字能夠看出它的意圖就是當低於核心吃大小時執行的方法。下面看其具體實現,首先獲取到鎖,由於這地方涉及到線程池狀態的變化,先經過if語句判斷當前線程池中的線程數目是否小於核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有線程池當前線程數目小於核心池大小纔會執行addIfUnderCorePoolSize方法的,爲什麼這地方還要繼續判斷?緣由很簡單,前面的判斷過程當中並無加鎖,所以可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完以後,在其餘線程中又向線程池提交了任務,就可能致使poolSize不小於corePoolSize了,因此須要在這個地方繼續判斷。而後接着判斷線程池的狀態是否爲RUNNING,緣由也很簡單,由於有可能在其餘線程中調用了shutdown或者shutdownNow方法。而後就是執行

t = addThread(firstTask);

這個方法也很是關鍵,傳進去的參數爲提交的任務,返回值爲Thread類型。而後接着在下面判斷t是否爲空,爲空則代表建立線程失敗(即poolSize>=corePoolSize或者runState不等於RUNNING),不然調用t.start()方法啓動線程。

咱們來看一下addThread方法的實現:

private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w);  //建立一個線程,執行任務 
    if (t != null) { w.thread = t;            //將建立的線程的引用賦值爲w的成員變量 
 workers.add(w); int nt = ++poolSize;     //當前線程數加1 
        if (nt > largestPoolSize) largestPoolSize = nt; } return t; }

https://www.cnblogs.com/dolphin0520/p/3932921.html

到這裏,大部分朋友應該對任務提交給線程池以後到被執行的整個過程有了一個基本的瞭解,下面總結一下:

  1)首先,要清楚corePoolSize和maximumPoolSize的含義;

  2)其次,要知道Worker是用來起到什麼做用的;(既然Worker實現了Runnable接口,從run方法的實現能夠看出,它首先執行的是經過構造器傳進來的任務firstTask,在調用runTask()執行完firstTask以後,在while循環裏面不斷經過getTask()去取新的任務來執行,那麼去哪裏取呢?天然是從任務緩存隊列裏面去取,getTask是ThreadPoolExecutor類中的方法,並非Worker類中的方法)

  3)要知道任務提交給線程池以後的處理策略,這裏總結一下主要有4點:

  • 若是當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;
  • 若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;
  • 若是當前線程池中的線程數目達到maximumPoolSize,則會採起任務拒絕策略進行處理;
  • 若是線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。

3.線程池中線程初始化

  

默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。

  在實際中若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法辦到:

  • prestartCoreThread():初始化一個核心線程;
  • prestartAllCoreThreads():初始化全部核心線程
  • 下面是這2個方法的實現:
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //注意傳進去的參數是null
} public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null
        ++n; return n; }

注意上面傳進去的參數是null,根據第2小節的分析可知若是傳進去的參數爲null,則最後執行線程會阻塞在getTask方法中的

1
r = workQueue.take();

   即等待任務隊列中有任務。

4.任務緩存及排隊策略

  

在前面咱們屢次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。

  workQueue的類型爲BlockingQueue<Runnable>,一般能夠取下面三種類型:

  1)ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;

  2)LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE;

  3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

5.任務拒絕策略

  當線程池的任務緩存隊列已滿而且線程池中的線程數目達到maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程) ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

6.線程池容量的動態調整

ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:設置核心池大小
  • setMaximumPoolSize:設置線程池最大能建立的線程數目大小

  當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能當即建立新的線程來執行任務。

3、具體使用

不過在java doc中,並不提倡咱們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立線程池:

Executors.newCachedThreadPool();        //建立一個緩衝池,緩衝池容量大小爲Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //建立容量爲1的緩衝池
Executors.newFixedThreadPool(int);    //建立固定容量大小的緩衝池

下面是這三個靜態方法的具體實現;

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。

  newFixedThreadPool建立的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置爲1,也使用的LinkedBlockingQueue;

  newCachedThreadPool將corePoolSize設置爲0,將maximumPoolSize設置爲Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就建立線程運行,當線程空閒超過60秒,就銷燬線程。

  實際中,若是Executors提供的三個靜態方法能知足要求,就儘可能使用它提供的三個方法,由於本身去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。

  另外,若是ThreadPoolExecutor達不到要求,能夠本身繼承ThreadPoolExecutor類進行重寫。

4、如何合理配置線程池大小

  

本節來討論一個比較重要的話題:如何合理配置線程池大小,僅供參考。

  通常須要根據任務的類型來配置線程池大小:

  若是是CPU密集型任務,就須要儘可能壓榨CPU,參考值能夠設爲 NCPU+1

  若是是IO密集型任務,參考值能夠設置爲2*NCPU

  固然,這只是一個參考值,具體的設置還須要根據實際狀況進行調整,好比能夠先將線程池大小設置爲參考值,再觀察任務運行狀況和系統負載、資源利用率來進行適當調整。

相關文章
相關標籤/搜索