本文爲 SnailClimb 的原創,目前已經收錄自我開源的 JavaGuide 中(61.5 k Star!【Java學習+面試指南】 一份涵蓋大部分Java程序員所須要掌握的核心知識。以爲內容不錯再 Star!)。html
另外推薦一篇原創:終極推薦!多是最適合你的Java學習路線+方法+網站+書籍推薦!java
synchronized關鍵字解決的是多個線程之間訪問資源的同步性,synchronized關鍵字能夠保證被它修飾的方法或者代碼塊在任意時刻只能有一個線程執行。git
另外,在 Java 早期版本中,synchronized屬於重量級鎖,效率低下,由於監視器鎖(monitor)是依賴於底層的操做系統的 Mutex Lock 來實現的,Java 的線程是映射到操做系統的原生線程之上的。若是要掛起或者喚醒一個線程,都須要操做系統幫忙完成,而操做系統實現線程之間的切換時須要從用戶態轉換到內核態,這個狀態之間的轉換須要相對比較長的時間,時間成本相對較高,這也是爲何早期的 synchronized 效率低的緣由。慶幸的是在 Java 6 以後 Java 官方對從 JVM 層面對synchronized 較大優化,因此如今的 synchronized 鎖效率也優化得很不錯了。JDK1.6對鎖的實現引入了大量的優化,如自旋鎖、適應性自旋鎖、鎖消除、鎖粗化、偏向鎖、輕量級鎖等技術來減小鎖操做的開銷。程序員
synchronized關鍵字最主要的三種使用方式:github
總結: synchronized 關鍵字加到 static 靜態方法和 synchronized(class)代碼塊上都是是給 Class 類上鎖。synchronized 關鍵字加到實例方法上是給對象實例上鎖。儘可能不要使用 synchronized(String a) 由於JVM中,字符串常量池具備緩存功能!面試
下面我以一個常見的面試題爲例講解一下 synchronized 關鍵字的具體使用。spring
面試中面試官常常會說:「單例模式瞭解嗎?來給我手寫一下!給我解釋一下雙重檢驗鎖方式實現單例模式的原理唄!」數據庫
雙重校驗鎖實現對象單例(線程安全)編程
public class Singleton { private volatile static Singleton uniqueInstance; private Singleton() { } public static Singleton getUniqueInstance() { //先判斷對象是否已經實例過,沒有實例化過才進入加鎖代碼 if (uniqueInstance == null) { //類對象加鎖 synchronized (Singleton.class) { if (uniqueInstance == null) { uniqueInstance = new Singleton(); } } } return uniqueInstance; } }
另外,須要注意 uniqueInstance 採用 volatile 關鍵字修飾也是頗有必要。後端
uniqueInstance 採用 volatile 關鍵字修飾也是頗有必要的, uniqueInstance = new Singleton(); 這段代碼實際上是分爲三步執行:
可是因爲 JVM 具備指令重排的特性,執行順序有可能變成 1->3->2。指令重排在單線程環境下不會出現問題,可是在多線程環境下會致使一個線程得到尚未初始化的實例。例如,線程 T1 執行了 1 和 3,此時 T2 調用 getUniqueInstance() 後發現 uniqueInstance 不爲空,所以返回 uniqueInstance,但此時 uniqueInstance 還未被初始化。
使用 volatile 能夠禁止 JVM 的指令重排,保證在多線程環境下也能正常運行。
synchronized 關鍵字底層原理屬於 JVM 層面。
① synchronized 同步語句塊的狀況
public class SynchronizedDemo { public void method() { synchronized (this) { System.out.println("synchronized 代碼塊"); } } }
經過 JDK 自帶的 javap 命令查看 SynchronizedDemo 類的相關字節碼信息:首先切換到類的對應目錄執行 javac SynchronizedDemo.java
命令生成編譯後的 .class 文件,而後執行javap -c -s -v -l SynchronizedDemo.class
。
從上面咱們能夠看出:
synchronized 同步語句塊的實現使用的是 monitorenter 和 monitorexit 指令,其中 monitorenter 指令指向同步代碼塊的開始位置,monitorexit 指令則指明同步代碼塊的結束位置。 當執行 monitorenter 指令時,線程試圖獲取鎖也就是獲取 monitor(monitor對象存在於每一個Java對象的對象頭中,synchronized 鎖即是經過這種方式獲取鎖的,也是爲何Java中任意對象能夠做爲鎖的緣由) 的持有權。當計數器爲0則能夠成功獲取,獲取後將鎖計數器設爲1也就是加1。相應的在執行 monitorexit 指令後,將鎖計數器設爲0,代表鎖被釋放。若是獲取對象鎖失敗,那當前線程就要阻塞等待,直到鎖被另一個線程釋放爲止。
② synchronized 修飾方法的的狀況
public class SynchronizedDemo2 { public synchronized void method() { System.out.println("synchronized 方法"); } }
synchronized 修飾的方法並無 monitorenter 指令和 monitorexit 指令,取得代之的確實是 ACC_SYNCHRONIZED 標識,該標識指明瞭該方法是一個同步方法,JVM 經過該 ACC_SYNCHRONIZED 訪問標誌來辨別一個方法是否聲明爲同步方法,從而執行相應的同步調用。
JDK1.6 對鎖的實現引入了大量的優化,如偏向鎖、輕量級鎖、自旋鎖、適應性自旋鎖、鎖消除、鎖粗化等技術來減小鎖操做的開銷。
鎖主要存在四種狀態,依次是:無鎖狀態、偏向鎖狀態、輕量級鎖狀態、重量級鎖狀態,他們會隨着競爭的激烈而逐漸升級。注意鎖能夠升級不可降級,這種策略是爲了提升得到鎖和釋放鎖的效率。
關於這幾種優化的詳細信息能夠查看筆主的這篇文章:https://gitee.com/SnailClimb/JavaGuide/blob/master/docs/java/Multithread/synchronized.md
① 二者都是可重入鎖
二者都是可重入鎖。「可重入鎖」概念是:本身能夠再次獲取本身的內部鎖。好比一個線程得到了某個對象的鎖,此時這個對象鎖尚未釋放,當其再次想要獲取這個對象的鎖的時候仍是能夠獲取的,若是不可鎖重入的話,就會形成死鎖。同一個線程每次獲取鎖,鎖的計數器都自增1,因此要等到鎖的計數器降低爲0時才能釋放鎖。
② synchronized 依賴於 JVM 而 ReentrantLock 依賴於 API
synchronized 是依賴於 JVM 實現的,前面咱們也講到了 虛擬機團隊在 JDK1.6 爲 synchronized 關鍵字進行了不少優化,可是這些優化都是在虛擬機層面實現的,並無直接暴露給咱們。ReentrantLock 是 JDK 層面實現的(也就是 API 層面,須要 lock() 和 unlock() 方法配合 try/finally 語句塊來完成),因此咱們能夠經過查看它的源代碼,來看它是如何實現的。
③ ReentrantLock 比 synchronized 增長了一些高級功能
相比synchronized,ReentrantLock增長了一些高級功能。主要來講主要有三點:①等待可中斷;②可實現公平鎖;③可實現選擇性通知(鎖能夠綁定多個條件)
ReentrantLock(boolean fair)
構造方法來制定是不是公平的。若是你想使用上述功能,那麼選擇ReentrantLock是一個不錯的選擇。
④ 性能已不是選擇標準
在 JDK1.2 以前,Java的內存模型實現老是從主存(即共享內存)讀取變量,是不須要進行特別的注意的。而在當前的 Java 內存模型下,線程能夠把變量保存本地內存(好比機器的寄存器)中,而不是直接在主存中進行讀寫。這就可能形成一個線程在主存中修改了一個變量的值,而另一個線程還繼續使用它在寄存器中的變量值的拷貝,形成數據的不一致。
要解決這個問題,就須要把變量聲明爲volatile,這就指示 JVM,這個變量是不穩定的,每次使用它都到主存中進行讀取。
說白了, volatile 關鍵字的主要做用就是保證變量的可見性而後還有一個做用是防止指令重排序。
synchronized關鍵字和volatile關鍵字比較
一般狀況下,咱們建立的變量是能夠被任何一個線程訪問並修改的。若是想實現每個線程都有本身的專屬本地變量該如何解決呢? JDK中提供的ThreadLocal
類正是爲了解決這樣的問題。 ThreadLocal
類主要解決的就是讓每一個線程綁定本身的值,能夠將ThreadLocal
類形象的比喻成存放數據的盒子,盒子中能夠存儲每一個線程的私有數據。
若是你建立了一個ThreadLocal
變量,那麼訪問這個變量的每一個線程都會有這個變量的本地副本,這也是ThreadLocal
變量名的由來。他們可使用 get()
和 set()
方法來獲取默認值或將其值更改成當前線程所存的副本的值,從而避免了線程安全問題。
再舉個簡單的例子:
好比有兩我的去寶屋收集寶物,這兩個共用一個袋子的話確定會產生爭執,可是給他們兩我的每一個人分配一個袋子的話就不會出現這樣的問題。若是把這兩我的比做線程的話,那麼ThreadLocal就是用來避免這兩個線程競爭的。
相信看了上面的解釋,你們已經搞懂 ThreadLocal 類是個什麼東西了。
import java.text.SimpleDateFormat; import java.util.Random; public class ThreadLocalExample implements Runnable{ // SimpleDateFormat 不是線程安全的,因此每一個線程都要有本身獨立的副本 private static final ThreadLocal<SimpleDateFormat> formatter = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMdd HHmm")); public static void main(String[] args) throws InterruptedException { ThreadLocalExample obj = new ThreadLocalExample(); for(int i=0 ; i<10; i++){ Thread t = new Thread(obj, ""+i); Thread.sleep(new Random().nextInt(1000)); t.start(); } } @Override public void run() { System.out.println("Thread Name= "+Thread.currentThread().getName()+" default Formatter = "+formatter.get().toPattern()); try { Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } //formatter pattern is changed here by thread, but it won't reflect to other threads formatter.set(new SimpleDateFormat()); System.out.println("Thread Name= "+Thread.currentThread().getName()+" formatter = "+formatter.get().toPattern()); } }
Output:
Thread Name= 0 default Formatter = yyyyMMdd HHmm Thread Name= 0 formatter = yy-M-d ah:mm Thread Name= 1 default Formatter = yyyyMMdd HHmm Thread Name= 2 default Formatter = yyyyMMdd HHmm Thread Name= 1 formatter = yy-M-d ah:mm Thread Name= 3 default Formatter = yyyyMMdd HHmm Thread Name= 2 formatter = yy-M-d ah:mm Thread Name= 4 default Formatter = yyyyMMdd HHmm Thread Name= 3 formatter = yy-M-d ah:mm Thread Name= 4 formatter = yy-M-d ah:mm Thread Name= 5 default Formatter = yyyyMMdd HHmm Thread Name= 5 formatter = yy-M-d ah:mm Thread Name= 6 default Formatter = yyyyMMdd HHmm Thread Name= 6 formatter = yy-M-d ah:mm Thread Name= 7 default Formatter = yyyyMMdd HHmm Thread Name= 7 formatter = yy-M-d ah:mm Thread Name= 8 default Formatter = yyyyMMdd HHmm Thread Name= 9 default Formatter = yyyyMMdd HHmm Thread Name= 8 formatter = yy-M-d ah:mm Thread Name= 9 formatter = yy-M-d ah:mm
從輸出中能夠看出,Thread-0已經改變了formatter的值,但仍然是thread-2默認格式化程序與初始化值相同,其餘線程也同樣。
上面有一段代碼用到了建立 ThreadLocal
變量的那段代碼用到了 Java8 的知識,它等於下面這段代碼,若是你寫了下面這段代碼的話,IDEA會提示你轉換爲Java8的格式(IDEA真的不錯!)。由於ThreadLocal類在Java 8中擴展,使用一個新的方法withInitial()
,將Supplier功能接口做爲參數。
private static final ThreadLocal<SimpleDateFormat> formatter = new ThreadLocal<SimpleDateFormat>(){ @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat("yyyyMMdd HHmm"); } };
從 Thread
類源代碼入手。
public class Thread implements Runnable { ...... //與此線程有關的ThreadLocal值。由ThreadLocal類維護 ThreadLocal.ThreadLocalMap threadLocals = null; //與此線程有關的InheritableThreadLocal值。由InheritableThreadLocal類維護 ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; ...... }
從上面Thread
類 源代碼能夠看出Thread
類中有一個 threadLocals
和 一個 inheritableThreadLocals
變量,它們都是 ThreadLocalMap
類型的變量,咱們能夠把 ThreadLocalMap
理解爲ThreadLocal
類實現的定製化的 HashMap
。默認狀況下這兩個變量都是null,只有當前線程調用 ThreadLocal
類的 set
或get
方法時才建立它們,實際上調用這兩個方法的時候,咱們調用的是ThreadLocalMap
類對應的 get()
、set()
方法。
ThreadLocal
類的set()
方法
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } ThreadLocalMap getMap(Thread t) { return t.threadLocals; }
經過上面這些內容,咱們足以經過猜想得出結論:最終的變量是放在了當前線程的 ThreadLocalMap
中,並非存在 ThreadLocal
上,ThreadLocal
能夠理解爲只是ThreadLocalMap
的封裝,傳遞了變量值。 ThrealLocal
類中能夠經過Thread.currentThread()
獲取到當前線程對象後,直接經過getMap(Thread t)
能夠訪問到該線程的ThreadLocalMap
對象。
每一個Thread
中都具有一個ThreadLocalMap
,而ThreadLocalMap
能夠存儲以ThreadLocal
爲key的鍵值對。 好比咱們在同一個線程中聲明瞭兩個 ThreadLocal
對象的話,會使用 Thread
內部都是使用僅有那個ThreadLocalMap
存放數據的,ThreadLocalMap
的 key 就是 ThreadLocal
對象,value 就是 ThreadLocal
對象調用set
方法設置的值。ThreadLocal
是 map結構是爲了讓每一個線程能夠關聯多個 ThreadLocal
變量。這也就解釋了 ThreadLocal 聲明的變量爲何在每個線程都有本身的專屬本地變量。
ThreadLocalMap
是ThreadLocal
的靜態內部類。
ThreadLocalMap
中使用的 key 爲 ThreadLocal
的弱引用,而 value 是強引用。因此,若是 ThreadLocal
沒有被外部強引用的狀況下,在垃圾回收的時候,key 會被清理掉,而 value 不會被清理掉。這樣一來,ThreadLocalMap
中就會出現key爲null的Entry。假如咱們不作任何措施的話,value 永遠沒法被GC 回收,這個時候就可能會產生內存泄露。ThreadLocalMap實現中已經考慮了這種狀況,在調用 set()
、get()
、remove()
方法的時候,會清理掉 key 爲 null 的記錄。使用完 ThreadLocal
方法後 最好手動調用remove()
方法
static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } }
弱引用介紹:
若是一個對象只具備弱引用,那就相似於無關緊要的生活用品。弱引用與軟引用的區別在於:只具備弱引用的對象擁有更短暫的生命週期。在垃圾回收器線程掃描它 所管轄的內存區域的過程當中,一旦發現了只具備弱引用的對象,無論當前內存空間足夠與否,都會回收它的內存。不過,因爲垃圾回收器是一個優先級很低的線程, 所以不必定會很快發現那些只具備弱引用的對象。
弱引用能夠和一個引用隊列(ReferenceQueue)聯合使用,若是弱引用所引用的對象被垃圾回收,Java虛擬機就會把這個弱引用加入到與之關聯的引用隊列中。
池化技術相比你們已經家常便飯了,線程池、數據庫鏈接池、Http 鏈接池等等都是對這個思想的應用。池化技術的思想主要是爲了減小每次獲取資源的消耗,提升對資源的利用率。
線程池提供了一種限制和管理資源(包括執行一個任務)。 每一個線程池還維護一些基本統計信息,例如已完成任務的數量。
這裏借用《Java 併發編程的藝術》提到的來講一下使用線程池的好處:
Runnable
自Java 1.0以來一直存在,但Callable
僅在Java 1.5中引入,目的就是爲了來處理Runnable
不支持的用例。Runnable
接口不會返回結果或拋出檢查異常,可是Callable
接口能夠。因此,若是任務不須要返回結果或拋出異常推薦使用 Runnable
接口,這樣代碼看起來會更加簡潔。
工具類 Executors
能夠實現 Runnable
對象和 Callable
對象之間的相互轉換。(Executors.callable(Runnable task
)或 Executors.callable(Runnable task,Object resule)
)。
Runnable.java
@FunctionalInterface public interface Runnable { /** * 被線程執行,沒有返回值也沒法拋出異常 */ public abstract void run(); }
Callable.java
@FunctionalInterface public interface Callable<V> { /** * 計算結果,或在沒法這樣作時拋出異常。 * @return 計算得出的結果 * @throws 若是沒法計算結果,則拋出異常 */ V call() throws Exception; }
execute()
方法用於提交不須要返回值的任務,因此沒法判斷任務是否被線程池執行成功與否;submit()
方法用於提交須要返回值的任務。線程池會返回一個 Future
類型的對象,經過這個 Future
對象能夠判斷任務是否執行成功,而且能夠經過 Future
的 get()
方法來獲取返回值,get()
方法會阻塞當前線程直到任務完成,而使用 get(long timeout,TimeUnit unit)
方法則會阻塞當前線程一段時間後當即返回,這時候有可能任務沒有執行完。咱們以AbstractExecutorService
接口中的一個 submit
方法爲例子來看看源代碼:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
上面方法調用的 newTaskFor
方法返回了一個 FutureTask
對象。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
咱們再來看看execute()
方法:
public void execute(Runnable command) { ... }
《阿里巴巴Java開發手冊》中強制線程池不容許使用 Executors 去建立,而是經過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險
Executors 返回線程池對象的弊端以下:
- FixedThreadPool 和 SingleThreadExecutor : 容許請求的隊列長度爲 Integer.MAX_VALUE ,可能堆積大量的請求,從而致使OOM。
- CachedThreadPool 和 ScheduledThreadPool : 容許建立的線程數量爲 Integer.MAX_VALUE ,可能會建立大量線程,從而致使OOM。
方式一:經過構造方法實現
方式二:經過Executor 框架的工具類Executors來實現
咱們能夠建立三種類型的ThreadPoolExecutor:
對應Executors工具類中的方法如圖所示:
ThreadPoolExecutor
類中提供的四個構造方法。咱們來看最長的那個,其他三個都是在這個構造方法的基礎上產生(其餘幾個構造方法說白點都是給定某些默認參數的構造方法好比默認制定拒絕策略是什麼),這裏就不貼代碼講了,比較簡單。
/** * 用給定的初始參數建立一個新的ThreadPoolExecutor。 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
下面這些對建立 很是重要,在後面使用線程池的過程當中你必定會用到!因此,務必拿着小本本記清楚。
ThreadPoolExecutor
構造函數重要參數分析ThreadPoolExecutor
3 個最重要的參數:
corePoolSize
: 核心線程數線程數定義了最小能夠同時運行的線程數量。maximumPoolSize
: 當隊列中存放的任務達到隊列容量的時候,當前能夠同時運行的線程數量變爲最大線程數。workQueue
: 當新任務來的時候會先判斷當前運行的線程數量是否達到核心線程數,若是達到的話,信任就會被存放在隊列中。ThreadPoolExecutor
其餘常見參數:
keepAliveTime
:當線程池中的線程數量大於 corePoolSize
的時候,若是這時沒有新的任務提交,核心線程外的線程不會當即銷燬,而是會等待,直到等待的時間超過了 keepAliveTime
纔會被回收銷燬;unit
: keepAliveTime
參數的時間單位。threadFactory
:executor 建立新線程的時候會用到。handler
:飽和策略。關於飽和策略下面單獨介紹一下。ThreadPoolExecutor
飽和策略ThreadPoolExecutor
飽和策略定義:
若是當前同時運行的線程數量達到最大線程數量而且隊列也已經被放滿了任時,ThreadPoolTaskExecutor
定義一些策略:
ThreadPoolExecutor.AbortPolicy
:拋出 RejectedExecutionException
來拒絕新任務的處理。ThreadPoolExecutor.CallerRunsPolicy
:調用執行本身的線程運行任務。您不會任務請求。可是這種策略會下降對於新任務提交速度,影響程序的總體性能。另外,這個策略喜歡增長隊列容量。若是您的應用程序能夠承受此延遲而且你不能任務丟棄任何一個任務請求的話,你能夠選擇這個策略。ThreadPoolExecutor.DiscardPolicy
: 不處理新任務,直接丟棄掉。ThreadPoolExecutor.DiscardOldestPolicy
: 此策略將丟棄最先的未處理的任務請求。舉個例子: Spring 經過 ThreadPoolTaskExecutor
或者咱們直接經過 ThreadPoolExecutor
的構造函數建立線程池的時候,當咱們不指定 RejectedExecutionHandler
飽和策略的話來配置線程池的時候默認使用的是 ThreadPoolExecutor.AbortPolicy
。在默認狀況下,ThreadPoolExecutor
將拋出 RejectedExecutionException
來拒絕新來的任務 ,這表明你將丟失對這個任務的處理。 對於可伸縮的應用程序,建議使用 ThreadPoolExecutor.CallerRunsPolicy
。當最大池被填滿時,此策略爲咱們提供可伸縮隊列。(這個直接查看 ThreadPoolExecutor
的構造函數源碼就能夠看出,比較簡單的緣由,這裏就不貼代碼了)
Runnable
+ThreadPoolExecutor
爲了讓你們更清楚上面的面試題中的一些概念,我寫了一個簡單的線程池 Demo。
首先建立一個 Runnable
接口的實現類(固然也能夠是 Callable
接口,咱們上面也說了二者的區別。)
MyRunnable.java
import java.util.Date; /** * 這是一個簡單的Runnable類,須要大約5秒鐘來執行其任務。 * @author shuang.kou */ public class MyRunnable implements Runnable { private String command; public MyRunnable(String s) { this.command = s; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date()); processCommand(); System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date()); } private void processCommand() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return this.command; } }
編寫測試程序,咱們這裏以阿里巴巴推薦的使用 ThreadPoolExecutor
構造函數自定義參數的方式來建立線程池。
ThreadPoolExecutorDemo.java
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorDemo { private static final int CORE_POOL_SIZE = 5; private static final int MAX_POOL_SIZE = 10; private static final int QUEUE_CAPACITY = 100; private static final Long KEEP_ALIVE_TIME = 1L; public static void main(String[] args) { //使用阿里巴巴推薦的建立線程池的方式 //經過ThreadPoolExecutor構造函數自定義參數建立 ThreadPoolExecutor executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++) { //建立WorkerThread對象(WorkerThread類實現了Runnable 接口) Runnable worker = new MyRunnable("" + i); //執行Runnable executor.execute(worker); } //終止線程池 executor.shutdown(); while (!executor.isTerminated()) { } System.out.println("Finished all threads"); } }
能夠看到咱們上面的代碼指定了:
corePoolSize
: 核心線程數爲 5。maximumPoolSize
:最大線程數 10keepAliveTime
: 等待時間爲 1L。unit
: 等待時間的單位爲 TimeUnit.SECONDS。workQueue
:任務隊列爲 ArrayBlockingQueue
,而且容量爲 100;handler
:飽和策略爲 CallerRunsPolicy
。Output:
pool-1-thread-2 Start. Time = Tue Nov 12 20:59:44 CST 2019 pool-1-thread-5 Start. Time = Tue Nov 12 20:59:44 CST 2019 pool-1-thread-4 Start. Time = Tue Nov 12 20:59:44 CST 2019 pool-1-thread-1 Start. Time = Tue Nov 12 20:59:44 CST 2019 pool-1-thread-3 Start. Time = Tue Nov 12 20:59:44 CST 2019 pool-1-thread-5 End. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-3 End. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-2 End. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-4 End. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-1 End. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-2 Start. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-1 Start. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-4 Start. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-3 Start. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-5 Start. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-2 End. Time = Tue Nov 12 20:59:54 CST 2019 pool-1-thread-3 End. Time = Tue Nov 12 20:59:54 CST 2019 pool-1-thread-4 End. Time = Tue Nov 12 20:59:54 CST 2019 pool-1-thread-5 End. Time = Tue Nov 12 20:59:54 CST 2019 pool-1-thread-1 End. Time = Tue Nov 12 20:59:54 CST 2019
承接 4.6 節,咱們經過代碼輸出結果能夠看出:線程池每次會同時執行 5 個任務,這 5 個任務執行完以後,剩餘的 5 個任務纔會被執行。 你們能夠先經過上面講解的內容,分析一下究竟是咋回事?(本身獨立思考一會)
如今,咱們就分析上面的輸出內容來簡單分析一下線程池原理。
爲了搞懂線程池的原理,咱們須要首先分析一下 execute
方法。在 4.6 節中的 Demo 中咱們使用 executor.execute(worker)
來提交一個任務到線程池中去,這個方法很是重要,下面咱們來看看它的源碼:
// 存放線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int workerCountOf(int c) { return c & CAPACITY; } private final BlockingQueue<Runnable> workQueue; public void execute(Runnable command) { // 若是任務爲null,則拋出異常。 if (command == null) throw new NullPointerException(); // ctl 中保存的線程池當前的一些狀態信息 int c = ctl.get(); // 下面會涉及到 3 步 操做 // 1.首先判斷當前線程池中之行的任務數量是否小於 corePoolSize // 若是小於的話,經過addWorker(command, true)新建一個線程,並將任務(command)添加到該線程中;而後,啓動該線程從而執行任務。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.若是當前之行的任務數量大於等於 corePoolSize 的時候就會走到這裏 // 經過 isRunning 方法判斷線程池狀態,線程池處於 RUNNING 狀態纔會被而且隊列能夠加入任務,該任務纔會被加入進去 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次獲取線程池狀態,若是線程池狀態不是 RUNNING 狀態就須要從任務隊列中移除任務,並嘗試判斷線程是否所有執行完畢。同時執行拒絕策略。 if (!isRunning(recheck) && remove(command)) reject(command); // 若是當前線程池爲空就新建立一個線程並執行。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3. 經過addWorker(command, false)新建一個線程,並將任務(command)添加到該線程中;而後,啓動該線程從而執行任務。 //若是addWorker(command, false)執行失敗,則經過reject()執行相應的拒絕策略的內容。 else if (!addWorker(command, false)) reject(command); }
經過下圖能夠更好的對上面這 3 步作一個展現,下圖是我爲了省事直接從網上找到,原地址不明。
如今,讓咱們在回到 4.6 節咱們寫的 Demo, 如今應該是否是很容易就能夠搞懂它的原理了呢?
沒搞懂的話,也不要緊,能夠看看個人分析:
咱們在代碼中模擬了 10 個任務,咱們配置的核心線程數爲 5 、等待隊列容量爲 100 ,因此每次只可能存在 5 個任務同時執行,剩下的 5 個任務會被放到等待隊列中去。當前的 5 個任務之行完成後,纔會之行剩下的 5 個任務。
Atomic 翻譯成中文是原子的意思。在化學上,咱們知道原子是構成通常物質的最小單位,在化學反應中是不可分割的。在咱們這裏 Atomic 是指一個操做是不可中斷的。即便是在多個線程一塊兒執行的時候,一個操做一旦開始,就不會被其餘線程干擾。
因此,所謂原子類說簡單點就是具備原子/原子操做特徵的類。
併發包 java.util.concurrent
的原子類都存放在java.util.concurrent.atomic
下,以下圖所示。
基本類型
使用原子的方式更新基本類型
數組類型
使用原子的方式更新數組裏的某個元素
引用類型
對象的屬性修改類型
AtomicInteger 類經常使用方法
public final int get() //獲取當前的值 public final int getAndSet(int newValue)//獲取當前的值,並設置新的值 public final int getAndIncrement()//獲取當前的值,並自增 public final int getAndDecrement() //獲取當前的值,並自減 public final int getAndAdd(int delta) //獲取當前的值,並加上預期的值 boolean compareAndSet(int expect, int update) //若是輸入的數值等於預期值,則以原子方式將該值設置爲輸入值(update) public final void lazySet(int newValue)//最終設置爲newValue,使用 lazySet 設置以後可能致使其餘線程在以後的一小段時間內仍是能夠讀到舊的值。
AtomicInteger 類的使用示例
使用 AtomicInteger 以後,不用對 increment() 方法加鎖也能夠保證線程安全。
class AtomicIntegerTest { private AtomicInteger count = new AtomicInteger(); //使用AtomicInteger以後,不須要對該方法加鎖,也能夠實現線程安全。 public void increment() { count.incrementAndGet(); } public int getCount() { return count.get(); } }
AtomicInteger 線程安全原理簡單分析
AtomicInteger 類的部分源碼:
// setup to use Unsafe.compareAndSwapInt for updates(更新操做時提供「比較並替換」的做用) private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value;
AtomicInteger 類主要利用 CAS (compare and swap) + volatile 和 native 方法來保證原子操做,從而避免 synchronized 的高開銷,執行效率大爲提高。
CAS的原理是拿指望的值和本來的一個值做比較,若是相同則更新成新的值。UnSafe 類的 objectFieldOffset() 方法是一個本地方法,這個方法是用來拿到「原來的值」的內存地址,返回值是 valueOffset。另外 value 是一個volatile變量,在內存中可見,所以 JVM 能夠保證任什麼時候刻任何線程總能拿到該變量的最新值。
關於 Atomic 原子類這部分更多內容能夠查看個人這篇文章:併發編程面試必備:JUC 中的 Atomic 原子類總結
AQS的全稱爲(AbstractQueuedSynchronizer),這個類在java.util.concurrent.locks包下面。
AQS是一個用來構建鎖和同步器的框架,使用AQS能簡單且高效地構造出應用普遍的大量的同步器,好比咱們提到的ReentrantLock,Semaphore,其餘的諸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基於AQS的。固然,咱們本身也能利用AQS很是輕鬆容易地構造出符合咱們本身需求的同步器。
AQS 原理這部分參考了部分博客,在5.2節末尾放了連接。
在面試中被問到併發知識的時候,大多都會被問到「請你說一下本身對於AQS原理的理解」。下面給你們一個示例供你們參加,面試不是背題,你們必定要加入本身的思想,即便加入不了本身的思想也要保證本身可以通俗的講出來而不是背出來。
下面大部份內容其實在AQS類註釋上已經給出了,不過是英語看着比較吃力一點,感興趣的話能夠看看源碼。
AQS核心思想是,若是被請求的共享資源空閒,則將當前請求資源的線程設置爲有效的工做線程,而且將共享資源設置爲鎖定狀態。若是被請求的共享資源被佔用,那麼就須要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。
CLH(Craig,Landin,and Hagersten)隊列是一個虛擬的雙向隊列(虛擬的雙向隊列即不存在隊列實例,僅存在結點之間的關聯關係)。AQS是將每條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node)來實現鎖的分配。
看個AQS(AbstractQueuedSynchronizer)原理圖:
AQS使用一個int成員變量來表示同步狀態,經過內置的FIFO隊列來完成獲取資源線程的排隊工做。AQS使用CAS對該同步狀態進行原子操做實現對其值的修改。
private volatile int state;//共享變量,使用volatile修飾保證線程可見性
狀態信息經過protected類型的getState,setState,compareAndSetState進行操做
//返回同步狀態的當前值 protected final int getState() { return state; } // 設置同步狀態的值 protected final void setState(int newState) { state = newState; } //原子地(CAS操做)將同步狀態值設置爲給定值update若是當前同步狀態的值等於expect(指望值) protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS定義兩種資源共享方式
ReentrantReadWriteLock 能夠當作是組合式,由於ReentrantReadWriteLock也就是讀寫鎖容許多個線程同時對某一資源進行讀。
不一樣的自定義同步器爭用共享資源的方式也不一樣。自定義同步器在實現時只須要實現共享資源 state 的獲取與釋放方式便可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。
同步器的設計是基於模板方法模式的,若是須要自定義同步器通常的方式是這樣(模板方法模式很經典的一個應用):
這和咱們以往經過實現接口的方式有很大區別,這是模板方法模式很經典的一個運用。
AQS使用了模板方法模式,自定義同步器時須要重寫下面幾個AQS提供的模板方法:
isHeldExclusively()//該線程是否正在獨佔資源。只有用到condition才須要去實現它。 tryAcquire(int)//獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。 tryRelease(int)//獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。 tryAcquireShared(int)//共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。 tryReleaseShared(int)//共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。
默認狀況下,每一個方法都拋出 UnsupportedOperationException
。 這些方法的實現必須是內部線程安全的,而且一般應該簡短而不是阻塞。AQS類中的其餘方法都是final ,因此沒法被其餘類使用,只有這幾個方法能夠被其餘類使用。
以ReentrantLock爲例,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。
再以CountDownLatch以例,任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每一個子線程執行完後countDown()一次,state會CAS(Compare and Swap)減1。等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後餘動做。
通常來講,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease
、tryAcquireShared-tryReleaseShared
中的一種便可。但AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock
。
推薦兩篇 AQS 原理和相關源碼分析的文章:
做者的其餘開源項目推薦: