在實際使用中,線程是很佔用系統資源的,若是對線程管理不完善的話很容易致使系統問題。所以,在大多數併發框架中都會使用線程池來管理線程,使用線程池管理線程主要有以下好處:java
流程圖:數組
▪ 一、好比咱們設置核心線程池的數量爲30個,無論有沒有用戶鏈接,咱們老是保證30個鏈接,這個就是核心線程數,這裏的核心線程數不必定是30你能夠根據你的需求、業務和併發訪問量來設置, 先判斷線程池中核心線程池全部的線程是否都在執行任務 ,若是不是,則新建立一個線程執行剛提交的任務,不然,核心線程池中全部的線程都在執行任務,則進入第2步;緩存
▪ 二、若是咱們核心線程數的30個數量已經滿了,就須要到阻塞隊列中去查看,判斷當前阻塞隊列是否已滿,若是未滿,則將提交的任務放置在阻塞隊列中等待執行;不然,則進入第3步;併發
▪ 三、判斷線程池中全部的線程是否都在執行任務,若是沒有,則建立一個新的線程來執行任務,不然,則交給 飽和策略 進行處理,也叫 拒絕策略 ,等下咱們會有詳細的介紹框架
注意:這裏有一個 核心線程數和一個線程池數量 ,這兩個是不一樣的概念, 核心線程數 表明我可以維護經常使用的線程開銷,而 線程池數量 則表明我最大可以建立的線程數,例如在咱們農村每家每戶都有吃水的井,基本上有 半井深的水 就能夠維持咱們的平常生活的使用,這裏的半井深的水就比如咱們的 核心線程數 ,還有一半的容量是咱們井可以容納的最大水資源了,超過了就不行,水就會漫出來,這個就相似於咱們的 線程池的數量 ,不知道這裏說明你們是否可以更好的進行理解ide
1.newCachedThreadPool: 建立一個可根據須要建立新線程的線程池,可是在之前構造的線程可用時講重用它們,並在須要時使用提供的ThreadFactory 建立新線程函數
特徵:高併發
(1) 線程池中的數量沒有固定,能夠達到最大值(Integer.MAX_VALUE=2147483647)工具
(2) 線程池中的線程可進行緩存重複利用和回收(回收默認時間爲1分鐘)性能
(3) 當線程池中,沒有可用線程,會從新建立一個線程
2.newFixedThreadPool:建立一個可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程,在任意點,在大多數nThreads線程會處於處理任務的活動狀態。若是在全部線程處於活動狀態時提交附件任務,則在有可用線程以前,附件任務將在隊列中等待,若是在關閉前的執行期間因爲失敗而致使任何線程終止,那麼一個新線程將代替它執行後續的任務(若是須要)。在某個線程被顯式關閉以前,池中的線程將一直存在
特徵:
(1) 線程池中的線程處於必定的量,能夠很好的控制線程的併發量
(2) 線程能夠重複被使用,在顯示關閉以前,都將一直存在
(3) 超過必定量的線程被提交時需在隊列中等待
3.newSingleThreadExecutor:建立一個使用單個 worker 線程的Executor ,以無界隊列方式來運行該線程。(注意,若是由於在關閉前的執行期間出現失敗而終止了此單個線程,那麼若是須要,一個新線程將代替它執行後續的任務)。可保證順序地執行各個任務,而且在任意給定的時間不會有多個線程是活動的,與其餘等效的 newFixedThreadPool(1)
不一樣,可保證無需從新配置此方法所返回的執行程序便可使用其餘的線程
特徵:
(1) 線程池中最多執行一個線程,以後提交的線程將會排在隊列中以此執行
4.newSingleThreadScheduledExecutor:建立一個單線程執行程序,它可安排在給定延遲後運行命令或者按期執行
特徵:
(1) 線程池中最多執行一個線程,以後提交的線程活動將會排在隊列中依次執行
(2) 可定時或者延遲執行線程活動
5.newScheduledThreadPool:建立一個線程池,它可安排在給定延遲後運行命令或者按期的執行
特徵:
(1) 線程池中具備執行數量的線程,即使是空線程也將保留
(2) 可定時或者延遲執行線程活動
6.newWorkStealingPool:建立一個帶並行級別的線程池,並行級別決定了同一時刻最多有多少個線程在執行,如不傳並行級別參數,將默認爲當前系統的CPU個數
咱們能夠在開發工具中搜索一個叫 Executors
的類,在裏面咱們能夠看到咱們上面全部的使用方法
線程工具類——Task :
public class Task implements Runnable{ @Override public void run() { try { //休眠1秒 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //輸出線程名 System.out.println(Thread.currentThread().getName()+"-------running"); } }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public class CacheThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 20; i++) { //提交任務 executorService.execute(new Task()); } //啓動有序關閉,其中先前提交的任務將被執行,但不會接受任何新任務 executorService.shutdown(); } }
結果輸出:
從開始到結束咱們總共輸出了20個( pool-1-thread-1到pool-1-thread-20 )線程
pool-1-thread-2-------running pool-1-thread-6-------running pool-1-thread-1-------running pool-1-thread-3-------running pool-1-thread-5-------running pool-1-thread-4-------running pool-1-thread-7-------running pool-1-thread-11-------running pool-1-thread-9-------running pool-1-thread-10-------running pool-1-thread-17-------running pool-1-thread-15-------running pool-1-thread-18-------running pool-1-thread-16-------running pool-1-thread-8-------running pool-1-thread-20-------running pool-1-thread-13-------running pool-1-thread-19-------running pool-1-thread-14-------running pool-1-thread-12-------running
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public class FixedThreadPoolDemo { public static void main(String[] args) { //建立線程池,最多容許五個線程執行 ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 20; i++) { //提交任務 executorService.execute(new Task()); } //啓動有序關閉,其中先前提交的任務將被執行,但不會接受任何新任務 executorService.shutdown(); } }
輸出結果:
咱們能夠看到其中的線程是每五個( pool-1-thread-1到pool-1-thread-5 )一執行,在當前執行的線程運行中,最多容許五個線程進行執行
pool-1-thread-4-------running pool-1-thread-2-------running pool-1-thread-1-------running pool-1-thread-3-------running pool-1-thread-5-------running pool-1-thread-4-------running pool-1-thread-5-------running pool-1-thread-3-------running pool-1-thread-2-------running pool-1-thread-1-------running pool-1-thread-4-------running pool-1-thread-2-------running pool-1-thread-1-------running pool-1-thread-3-------running pool-1-thread-5-------running pool-1-thread-4-------running pool-1-thread-5-------running pool-1-thread-2-------running pool-1-thread-1-------running pool-1-thread-3-------running
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public class SingleThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 20; i++) { //提交任務 executorService.execute(new Task()); } //啓動有序關閉,其中先前提交的任務將被執行,但不會接受任何新任務 executorService.shutdown(); } }
結果輸出:
咱們能夠看到每次都是線程1輸出結果
pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running
public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); // for (int i = 0; i < 20; i++) { System.out.println(System.currentTimeMillis()); scheduledExecutorService.schedule(new Runnable() { @Override public void run() { System.out.println("延遲三秒執行"); System.out.println(System.currentTimeMillis()); } },3, TimeUnit.SECONDS); // } scheduledExecutorService.shutdown(); } }
輸出結果:
1606744468814 延遲三秒執行 1606744471815
public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { int i = 1; @Override public void run() { System.out.println(i); i++; } },0,1, TimeUnit.SECONDS); // scheduledExecutorService.shutdown(); }
輸出結果:
通常來講線程池只有兩種狀態,一種是 Running
,一種是 TERMINATED
,圖中間的都是過渡狀態
Running
:能接受新提交的任務,而且也能處理阻塞隊列中的任務
SHUTDOWN
:關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務
STOP
:不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程
TIDYING
:若是全部的任務都已終止了,workerCount(有效線程數)爲0.線程池進入該狀態後會調用terminated()方法進入TERMINATED狀態
TERMINATED
:在terminated()方法執行完成後進入該狀態,默認terminated()方法中什麼也沒有作
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize
:核心線程池的大小
maximumPoolSize
:線程池能建立線程的最大個數
keepAliveTime
:空閒線程存活時間
unit
:時間單位,爲keepAliveTime指定時間單位
workQueue
:阻塞隊列,用於保存任務的阻塞隊列
threadFactory
:建立線程的工程類
handler
:飽和策略(拒絕策略)
ArrayBlockingQueue:
基於數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是-個經常使用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。
ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味着二者沒法真正並行運行,這點尤爲不一樣於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue徹底能夠採用分離鎖,從而實現生產者和消費者操做的徹底並行運行。Doug Lea之因此沒這樣去作,也許是由於ArrayBlockingQueue的數據寫入和獲取操做已經足夠輕巧,以致於引入獨立的鎖機制,除了給代碼帶來額外的複雜性外,其在性能上徹底佔不到任何便宜。
ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不一樣之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的對象實例,然後者則會生成一個額外的Node對象。這在長時間內須要高效併發地處理大批量數據的系統中,其對於GC的影響仍是存在必定的區別。而在建立ArrayBlockingQueue時,咱們還能夠控制對象的內部鎖是否採用公平鎖,默認採用非公平鎖。
基於鏈表的阻塞隊列,同ArrayListBlockingQueue相似,其內部也維持着一個數據緩衝隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者當即返回;只有當隊列緩衝區達到最大值緩存容量時( LinkedBlockingQueue能夠經過構造函數指定該值),纔會阻塞生產者隊列,直到消費者從隊列中消費掉─份數據,生產者線程會被喚醒,反之對打於消費者這端的處理也基於一樣的原理。而
LinkedBlockingQueue之因此可以高效的處理併發數據,還由於其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。
DelayQueue中的元素只有當其指定的延遲時間到了,纔可以從隊列中獲取到該元素。DelayQueue是一個沒有大小限制的隊列,所以往隊列中插入數據的操做(生產者)永遠不會被阻塞,而只有獲取數據的操做(消費者)纔會被阻塞。
使用場景︰
DelayQueue使用場景較少,但都至關巧妙,常見的例子好比使用一個DelayQueue來管理一個超時未響應的鏈接隊列。
基於優先級的阻塞隊列(優先級的判斷經過構造函數傳入的Compator對象來決定),但須要注意的是
PriorityBlockingQueue並不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。所以使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,不然時間一長,會最終耗盡全部的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖採用的是公平鎖。
一種無緩衝的等待隊列,相似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿着產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,若是一方沒有找到合適的目標,那麼對不起,你們都在集市等待。相對於有緩衝的BlockingQueue來講,少了一箇中間經銷商的環節(緩衝區),若是有經銷商,生產者直接把產品批發給經銷商,而無需在乎經銷商最終會將這些產品賣給那些消費者,因爲經銷商能夠庫存一部分商品,所以相對於直接交易模式,整體來講採用中間經銷商的模式會吞吐量高一些(能夠批量買賣)﹔但另外一方面,又由於經銷商的引入,使得產品從生產者到消費者中間增長了額外的交易環節,單個產品的及時響應性能可能會下降。
聲明一個SynchronousQueue有兩種不一樣的方式,它們之間有着不太同樣的行爲。公平模式和非公平模式的區別:若是採用公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO隊列來阻塞多餘的生產者和消費者,從而體系總體的公平策略;
但若是是非公平模式 ( SynchronousQueue默認) : SynchronousQueue採用非公平鎖,同時配合一個LIFO隊列來管理多餘的生產者和消費者,然後一種模式,若是生產者和消費者的處理速度有差距,則很容易出現飢渴的狀況,便可能有某些生產者或者是消費者的數據永遠都得不處處理。
注意:
arrayblockingqueue和 linkedblockqueue 的區別:1.隊列中鎖的實現不一樣
一、ArrayBlockingQueue實現的隊列中的鎖是沒有分離的,即生產和消費用的是同一個鎖;
LinkedBlockingQueue實現的隊列中的鎖是分離的,即生產用的是putLock,消費是takeLock2.隊列大小初始化方式不一樣
二、ArrayBlockingQueue實現的隊列中必須指定隊列的大小;
LinkedBlockingQueue實現的隊列中能夠不指定隊列的大小,可是默認是Integer.MAX_VALUE
ThreadPoolExecutor.AbortPolicy(系統默認):丟棄任務並拋出RejectedExecutionException異常,讓你感知到任務被拒絕了,咱們能夠根據業務邏輯選擇重試或者放棄提交等策略
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常,相對而言存在必定的風險,由於咱們提交的時候根本不知道這個任務會被丟棄,可能形成數據丟失。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程),一般是存活時間最長的任務,它也存在必定的數據丟失風險
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
submit是基方法Executor.execute(Runnable)的延伸,經過建立並返回一個Future類對象可用於取消執行和/或等待完成。
若是你以爲這篇內容對你還蠻有幫助,我想邀請你幫我三個小忙: