線程池java
Java中的線程池是運用場景最多的併發框架,幾乎全部須要異步或併發執行任務的程序
均可以使用線程池。在開發過程當中,合理地使用線程池可以帶來3個好處。
第一:下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
第二:提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。
第三:提升線程的可管理性。線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,
還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控。可是,要作到合理利用
線程池,必須對其實現原理了如指掌。程序員
線程池是爲忽然大量爆發的線程設計的,經過有限的幾個固定線程爲大量的操做服務,減小了建立和銷燬線程所需的時間,從而提升效率。算法
若是一個線程的時間很是長,就不必用線程池了(不是不能做長時間操做,而是不宜。),何況咱們還不能控制線程池中線程的開始、掛起、和停止。數據庫
Java是天生就支持併發的語言,支持併發意味着多線程,線程的頻繁建立在高併發及大數據量是很是消耗資源的,由於java提供了線程池。在jdk1.5之前的版本中,線程池的使用是及其簡陋的,可是在JDK1.5後,有了很大的改善。JDK1.5以後加入了java.util.concurrent包,java.util.concurrent包的加入給予開發人員開發併發程序以及解決併發問題很大的幫助。這篇文章主要介紹下併發包下的Executor接口,Executor接口雖然做爲一個很是舊的接口(JDK1.5 2004年發佈),可是不少程序員對於其中的一些原理仍是不熟悉,所以寫這篇文章來介紹下Executor接口,同時鞏固下本身的知識。若是文章中有出現錯誤,歡迎你們指出。緩存
Executor框架的最頂層實現是ThreadPoolExecutor類,Executors工廠類中提供的newScheduledThreadPool、newFixedThreadPool、newCachedThreadPool方法其實也只是ThreadPoolExecutor的構造函數參數不一樣而已。經過傳入不一樣的參數,就能夠構造出適用於不一樣應用場景下的線程池,那麼它的底層原理是怎樣實現的呢,這篇就來介紹下ThreadPoolExecutor線程池的運行過程。數據結構
corePoolSize: 核心池的大小。 當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中
maximumPoolSize: 線程池最大線程數,它表示在線程池中最多能建立多少個線程;
keepAliveTime: 表示線程沒有任務執行時最多保持多久時間會終止。
unit: 參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:多線程
Java經過Executors(jdk1.5併發包)提供四種線程池,分別爲:
newCachedThreadPool建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。併發
案例演示:框架
newFixedThreadPool 建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。
newScheduledThreadPool 建立一個定長線程池,支持定時及週期性任務執行。
newSingleThreadExecutor 建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。異步
建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。示例代碼以下:
// 無限大小線程池 jvm自動回收 ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int temp = i; newCachedThreadPool.execute(new Runnable() {
@Override public void run() { try { Thread.sleep(100); } catch (Exception e) { // TODO: handle exception } System.out.println(Thread.currentThread().getName() + ",i:" + temp);
} }); } |
總結: 線程池爲無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的線程,而不用每次新建線程。
建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。示例代碼以下:
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { final int temp = i; newFixedThreadPool.execute(new Runnable() {
@Override public void run() { System.out.println(Thread.currentThread().getId() + ",i:" + temp);
} }); } |
總結:由於線程池大小爲3,每一個任務輸出index後sleep 2秒,因此每兩秒打印3個數字。
定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()
建立一個定長線程池,支持定時及週期性任務執行。延遲執行示例代碼以下:
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(5); for (int i = 0; i < 10; i++) { final int temp = i; newScheduledThreadPool.schedule(new Runnable() { public void run() { System.out.println("i:" + temp); } }, 3, TimeUnit.SECONDS); } |
表示延遲3秒執行。
建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。示例代碼以下:
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int index = i; newSingleThreadExecutor.execute(new Runnable() {
@Override public void run() { System.out.println("index:" + index); try { Thread.sleep(200); } catch (Exception e) { // TODO: handle exception } } }); } |
注意: 結果依次輸出,至關於順序執行各個任務。
提交一個任務到線程池中,線程池的處理流程以下:
一、判斷線程池裏的核心線程是否都在執行任務,若是不是(核心線程空閒或者還有核心線程沒有被建立)則建立一個新的工做線程來執行任務。若是核心線程都在執行任務,則進入下個流程。
二、線程池判斷工做隊列是否已滿,若是工做隊列沒有滿,則將新提交的任務存儲在這個工做隊列裏。若是工做隊列滿了,則進入下個流程。
三、判斷線程池裏的線程是否都處於工做狀態,若是沒有,則建立一個新的工做線程來執行任務。若是已經滿了,則交給飽和策略來處理這個任務。
要想合理的配置線程池,就必須首先分析任務特性,能夠從如下幾個角度來進行分析:
任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
任務的優先級:高,中和低。
任務的執行時間:長,中和短。
任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接。
任務性質不一樣的任務能夠用不一樣規模的線程池分開處理。CPU密集型任務配置儘量少的線程數量,如配置Ncpu+1個線程的線程池。IO密集型任務則因爲須要等待IO操做,線程並非一直在執行任務,則配置儘量多的線程,如2*Ncpu。混合型的任務,若是能夠拆分,則將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於串行執行的吞吐率,若是這兩個任務執行時間相差太大,則不必進行分解。咱們能夠經過Runtime.getRuntime().availableProcessors()方法得到當前設備的CPU個數。
優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先獲得執行,須要注意的是若是一直有優先級高的任務提交到隊列裏,那麼優先級低的任務可能永遠不能執行。
執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者也可使用優先級隊列,讓執行時間短的任務先執行。
依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,若是等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。
通常總結哦,有其餘更好的方式,但願各位留言,謝謝。
CPU密集型時,任務能夠少配置線程數,大概和機器的cpu核數至關,這樣可使得每一個線程都在執行任務
IO密集型時,大部分線程都阻塞,故須要多配置線程數,2*cpu核數
操做系統之名稱解釋:
某些進程花費了絕大多數時間在計算上,而其餘則在等待I/O上花費了大可能是時間,
前者稱爲計算密集型(CPU密集型)computer-bound,後者稱爲I/O密集型,I/O-bound。
當多個請求同時操做數據庫時,首先將訂單狀態改成已支付,在金額加上200,在同時併發場景查詢條件下,會形成重複通知。
SQL:
Update
悲觀鎖:悲觀鎖悲觀的認爲每一次操做都會形成更新丟失問題,在每次查詢時加上排他鎖。
每次去拿數據的時候都認爲別人會修改,因此每次在拿數據的時候都會上鎖,這樣別人想拿這個數據就會block直到它拿到鎖。傳統的關係型數據庫裏邊就用到了不少這種鎖機制,好比行鎖,表鎖等,讀鎖,寫鎖等,都是在作操做以前先上鎖。
Select * from xxx for update;
樂觀鎖:樂觀鎖會樂觀的認爲每次查詢都不會形成更新丟失,利用版本字段控制
鎖做爲併發共享數據,保證一致性的工具,在JAVA平臺有多種實現(如 synchronized 和 ReentrantLock等等 ) 。這些已經寫好提供的鎖爲咱們開發提供了便利。
重入鎖,也叫作遞歸鎖,指的是同一線程 外層函數得到鎖以後 ,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響。
在JAVA環境下 ReentrantLock 和synchronized 都是 可重入鎖
package com.itmayiedu; public class Test implements Runnable { public synchronized void get() { System.out.println("name:" + Thread.currentThread().getName() + " get();"); set(); } public synchronized void set() { System.out.println("name:" + Thread.currentThread().getName() + " set();"); } @Override public void run() { get(); } public static void main(String[] args) { Test ss = new Test(); new Thread(ss).start(); new Thread(ss).start(); new Thread(ss).start(); new Thread(ss).start(); } }
package com.itmayiedu; import java.util.concurrent.locks.ReentrantLock; public class Test02 extends Thread { ReentrantLock lock = new ReentrantLock(); public void get() { lock.lock(); System.out.println(Thread.currentThread().getId()); set(); lock.unlock(); } public void set() { lock.lock(); System.out.println(Thread.currentThread().getId()); lock.unlock(); } @Override public void run() { get(); } public static void main(String[] args) { Test ss = new Test(); new Thread(ss).start(); new Thread(ss).start(); new Thread(ss).start(); } }
相比Java中的鎖(Locks in Java)裏Lock實現,讀寫鎖更復雜一些。假設你的程序中涉及到對一些共享資源的讀和寫操做,且寫操做沒有讀操做那麼頻繁。在沒有寫操做的時候,兩個線程同時讀一個資源沒有任何問題,因此應該容許多個線程能在同時讀取共享資源。可是若是有一個線程想去寫這些共享資源,就不該該再有其它線程對該資源進行讀或寫(譯者注:也就是說:讀-讀能共存,讀-寫不能共存,寫-寫不能共存)。這就須要一個讀/寫鎖來解決這個問題。Java5在java.util.concurrent包中已經包含了讀寫鎖。儘管如此,咱們仍是應該瞭解其實現背後的原理。
package com.itmayiedu; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Cache { static Map<String, Object> map = new HashMap<String, Object>(); static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock r = rwl.readLock(); static Lock w = rwl.writeLock(); // 獲取一個key對應的value public static final Object get(String key) { r.lock(); try { System.out.println("正在作讀的操做,key:" + key + " 開始"); Thread.sleep(100); Object object = map.get(key); System.out.println("正在作讀的操做,key:" + key + " 結束"); System.out.println(); return object; } catch (InterruptedException e) { } finally { r.unlock(); } return key; } // 設置key對應的value,並返回舊有的value public static final Object put(String key, Object value) { w.lock(); try { System.out.println("正在作寫的操做,key:" + key + ",value:" + value + "開始."); Thread.sleep(100); Object object = map.put(key, value); System.out.println("正在作寫的操做,key:" + key + ",value:" + value + "結束."); System.out.println(); return object; } catch (InterruptedException e) { } finally { w.unlock(); } return value; } // 清空全部的內容 public static final void clear() { w.lock(); try { map.clear(); } finally { w.unlock(); } } public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 10; i++) { Cache.put(i + "", i + ""); } } }).start(); new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 10; i++) { Cache.get(i + ""); } } }).start(); } }
(1)與鎖相比,使用比較交換(下文簡稱CAS)會使程序看起來更加複雜一些。但因爲其非阻塞性,它對死鎖問題天生免疫,而且,線程間的相互影響也遠遠比基於鎖的方式要小。更爲重要的是,使用無鎖的方式徹底沒有鎖競爭帶來的系統開銷,也沒有線程間頻繁調度帶來的開銷,所以,它要比基於鎖的方式擁有更優越的性能。
(2)無鎖的好處:
第一,在高併發的狀況下,它比有鎖的程序擁有更好的性能;
第二,它天生就是死鎖免疫的。
就憑藉這兩個優點,就值得咱們冒險嘗試使用無鎖的併發。
(3)CAS算法的過程是這樣:它包含三個參數CAS(V,E,N): V表示要更新的變量,E表示預期值,N表示新值。僅當V值等於E值時,纔會將V的值設爲N,若是V值和E值不一樣,則說明已經有其餘線程作了更新,則當前線程什麼都不作。最後,CAS返回當前V的真實值。
(4)CAS操做是抱着樂觀的態度進行的,它老是認爲本身能夠成功完成操做。當多個線程同時使用CAS操做一個變量時,只有一個會勝出,併成功更新,其他均會失敗。失敗的線程不會被掛起,僅是被告知失敗,而且容許再次嘗試,固然也容許失敗的線程放棄操做。基於這樣的原理,CAS操做即便沒有鎖,也能夠發現其餘線程對當前線程的干擾,並進行恰當的處理。
(5)簡單地說,CAS須要你額外給出一個指望值,也就是你認爲這個變量如今應該是什麼樣子的。若是變量不是你想象的那樣,那說明它已經被別人修改過了。你就從新讀取,再次嘗試修改就行了。
(6)在硬件層面,大部分的現代處理器都已經支持原子化的CAS指令。在JDK 5.0之後,虛擬機即可以使用這個指令來實現併發操做和併發數據結構,而且,這種操做在虛擬機中能夠說是無處不在。
/** * Atomically increments by one the current value. * * @return the updated value */ public final int incrementAndGet() { for (;;) { //獲取當前值 int current = get(); //設置指望值 int next = current + 1; //調用Native方法compareAndSet,執行CAS操做 if (compareAndSet(current, next)) //成功後纔會返回指望值,不然無線循環 return next; } } |
自旋鎖是採用讓當前線程不停地的在循環體內執行實現的,當循環的條件被其餘線程改變時 才能進入臨界區。以下
package com.itmayiedu; import java.util.concurrent.atomic.AtomicReference; public class SpinLock { private AtomicReference<Thread> sign = new AtomicReference<>(); public void lock() { Thread current = Thread.currentThread(); while (!sign.compareAndSet(null, current)) { } } public void unlock() { Thread current = Thread.currentThread(); sign.compareAndSet(current, null); } }
package com.itmayiedu; public class Test implements Runnable { static int sum; private SpinLock lock; public Test(SpinLock lock) { this.lock = lock; } public static void main(String[] args) throws InterruptedException { SpinLock lock = new SpinLock(); for (int i = 0; i < 100; i++) { Test test = new Test(lock); Thread t = new Thread(test); t.start(); } Thread.currentThread().sleep(1000); System.out.println(sum); } @Override public void run() { this.lock.lock(); this.lock.lock(); sum++; this.lock.unlock(); this.lock.unlock(); } }
當一個線程 調用這個不可重入的自旋鎖去加鎖的時候沒問題,當再次調用lock()的時候,由於自旋鎖的持有引用已經不爲空了,該線程對象會誤認爲是別人的線程持有了自旋鎖
使用了CAS原子操做,lock函數將owner設置爲當前線程,而且預測原來的值爲空。unlock函數將owner設置爲null,而且預測值爲當前線程。
當有第二個線程調用lock操做時因爲owner值不爲空,致使循環一直被執行,直至第一個線程調用unlock函數將owner設置爲null,第二個線程才能進入臨界區。
因爲自旋鎖只是將當前線程不停地執行循環體,不進行線程狀態的改變,因此響應速度更快。但當線程數不停增長時,性能降低明顯,由於每一個線程都須要執行,佔用CPU時間。若是線程競爭不激烈,而且保持鎖的時間段。適合使用自旋鎖。
若是想在不一樣的jvm中保證數據同步,使用分佈式鎖技術。
有數據庫實現、緩存實現、Zookeeper分佈式鎖