從一個簡單的Java單例示例談談併發

一個簡單的單例示例

單例模式多是你們常常接觸和使用的一個設計模式,你可能會這麼寫html

public class UnsafeLazyInitiallization {    private static UnsafeLazyInitiallization instance;    private UnsafeLazyInitiallization() {
    }    public static UnsafeLazyInitiallization getInstance(){        if(instance==null){   //1:A線程執行
            instance=new UnsafeLazyInitiallization();  //2:B線程執行
        }        return instance;
    }
}

上面代碼你們應該都知道,所謂的線程不安全的懶漢單例寫法。在UnsafeLazyInitiallization類中,假設A線程執行代碼1的同時,B線程執行代碼2,此時,線程A可能看到instance引用的對象尚未初始化。java

你可能會說,線程不安全,我能夠對getInstance()方法作同步處理保證安全啊,好比下面這樣的寫法node

 public class SafeLazyInitiallization {        private static SafeLazyInitiallization instance;        private SafeLazyInitiallization() {
        }        public synchronized static SafeLazyInitiallization getInstance(){            if(instance==null){
                instance=new SafeLazyInitiallization();
            }            return instance;
        }
    }

這樣的寫法是保證了線程安全,可是因爲getInstance()方法作了同步處理,synchronized將致使性能開銷。如getInstance()方法被多個線程頻繁調用,將會致使程序執行性能的降低。反之,若是getInstance()方法不會被多個線程頻繁的調用,那麼這個方案將可以提供使人滿意的性能。linux

那麼,有沒有更優雅的方案呢?前人的智慧是偉大的,在早期的JVM中,synchronized存在巨大的性能開銷,所以,人們想出了一個「聰明」的技巧——雙重檢查鎖定。人們經過雙重檢查鎖定來下降同步的開銷。下面來讓咱們看看c++

public class DoubleCheckedLocking { //1
    private static DoubleCheckedLocking instance; //2
    private DoubleCheckedLocking() {
    }    public static DoubleCheckedLocking getInstance() { //3
        if (instance == null) { //4:第一次檢查
            synchronized (DoubleCheckedLocking.class) { //5:加鎖
                if (instance == null) //6:第二次檢查
                    instance = new DoubleCheckedLocking(); //7:問題的根源出在這裏
            } //8
        } //9
        return instance; //10
    } //11}

如上面代碼所示,若是第一次檢查instance不爲null,那麼就不須要執行下面的加鎖和初始化操做。所以,能夠大幅下降synchronized帶來的性能開銷。雙重檢查鎖定看起來彷佛很完美,但這是一個錯誤的優化!爲何呢?在線程執行到第4行,代碼讀取到instance不爲null時,instance引用的對象有可能尚未完成初始化。在第7行建立了一個對象,這行代碼能夠分解爲以下的3行僞代碼程序員

memory=allocate(); //1:分配對象的內存空間ctorInstance(memory); //2:初始化對象instance=memory; //3:設置instance指向剛分配的內存地址

上面3行代碼中的2和3之間,可能會被重排序(在一些JIT編譯器上,這種重排序是真實發生的,若是不瞭解重排序,後文JMM會詳細解釋)。2和3之間重排序以後的執行時序以下算法

memory=allocate(); //1:分配對象的內存空間instance=memory; //3:設置instance指向剛分配的內存地址,注意此時對象尚未被初始化ctorInstance(memory); //2:初始化對象

回到示例代碼第7行,若是發生重排序,另外一個併發執行的線程B就有可能在第4行判斷instance不爲null。線程B接下來將訪問instance所引用的對象,但此時這個對象可能尚未被A線程初始化。在知曉問題發生的根源以後,咱們能夠想出兩個辦法解決編程

  • 不容許2和3重排序swift

  • 容許2和3重排序,但不容許其餘線程「看到」這個重排序windows

下面就介紹這兩個解決方案的具體實現

基於Volatile的解決方案

對於前面的基於雙重檢查鎖定的方案,只須要作一點小的修改,就能夠實現線程安全的延遲初始化。請看下面的示例代碼

public class SafeDoubleCheckedLocking {    private volatile static SafeDoubleCheckedLocking instance;    private SafeDoubleCheckedLocking() {
    }    public static SafeDoubleCheckedLocking getInstance() {        if (instance == null) {            synchronized (SafeDoubleCheckedLocking.class) {                if (instance == null)

                    instance = new SafeDoubleCheckedLocking();//instance爲volatile,如今沒問題了

            }

        }        return instance;

    }

}

當聲明對象的引用爲volatile後,前面僞代碼談到的2和3之間的重排序,在多線程環境中將會被禁止。

基於類初始化的解決方案

JVM在類的初始化階段(即在Class被加載後,且被線程使用以前),會執行類的初始化。在執行類的初始化期間,JVM會去獲取多個線程對同一個類的初始化。基於這個特性,實現的示例代碼以下

public class InstanceFactory {    private InstanceFactory() {
    }    private static class InstanceHolder {        public static InstanceFactory instance = new InstanceFactory();

    }    public static InstanceFactory getInstance() {        return InstanceHolder.instance; //這裏將致使InstanceHolder類被初始化

    }

}

這個方案的本質是容許前面僞代碼談到的2和3重排序,但不容許其餘線程「看到」這個重排序。在InstanceFactory示例代碼中,首次執行getInstance()方法的線程將致使InstanceHolder類被初始化。因爲Java語言是多線程的,多個線程可能在同一時間嘗試去初始化同一個類或接口(好比這裏多個線程可能會在同一時刻調用getInstance()方法來初始化IInstanceHolder類)。Java語言規定,對於每個類和接口C,都有一個惟一的初始化鎖LC與之對應。從C到LC的映射,由JVM的具體實現去自由實現。JVM在類初始化期間會獲取這個初始化鎖,而且每一個線程至少獲取一次鎖來確保這個類已經被初始化過了。

JMM

也許你還存在疑問,前面談的重排序是什麼鬼?爲何volatile在某方面就能禁止重排序?如今引出本文的另外一個話題JMM(Java Memory Model——Java內存模型)。什麼是JMM呢?JMM是一個抽象概念,它並不存在。Java虛擬機規範中試圖定義一種Java內存模型(JMM)來屏蔽掉各類硬件和操做系統的內存訪問差別,以實現讓Java程序在各類平臺下都能達到一致的內存訪問效果。在此以前,主流程序語言(如C/C++等)直接使用物理硬件和操做系統的內存模型,所以,會因爲不一樣平臺的內存模型的差別,有可能致使程序在一套平臺上併發徹底正常,而在另外一套平臺上併發訪問卻常常出錯,所以在某些場景就必須針對不一樣的平臺來編寫程序。

Java線程之間的通訊由JMM來控制,JMM決定一個線程共享變量的寫入什麼時候對另外一個線程可見。JMM保證若是程序是正確同步的,那麼程序的執行將具備順序一致性。從抽象的角度看,JMM定義了線程和主內存之間的抽象關係:線程之間的共享變量(實例域、靜態域和數據元素)存儲在主內存(Main Memory)中,每一個線程都有一個私有的本地內存(Local Memory),本地內存中存儲了該線程以讀/寫共享變量的副本(局部變量、方法定義參數和異常處理參數是不會在線程之間共享,它們存儲在線程的本地內存中)。從物理角度上看,主內存僅僅是虛擬機內存的一部分,與物理硬件的主內存名字同樣,二者能夠互相類比;而本地內存,可與處理器高速緩存類比。Java內存模型的抽象示意圖如圖所示

enter p_w_picpath description here

這裏先介紹幾個基礎概念:8種操做指令、內存屏障、順序一致性模型、as-if-serial、happens-before 、數據依賴性、 重排序。

8種操做指令

關於主內存與本地內存之間具體的交互協議,即一個變量如何從主內存拷貝到本地內存、如何從本地內存同步回主內存之類的實現細節,JMM中定義瞭如下8種操做來完成,虛擬機實現時必須保證下面說起的每種操做都是原子的、不可再分的(對於double和long類型的遍從來說,load、store、read和write操做在某些平臺上容許有例外):

  • lock(鎖定):做用於主內存的變量,它把一個變量標識爲一條線程獨立的狀態。

  • unlock(解鎖):做用於主內存的變量,它把一個處於鎖定狀態的變量釋放出來,釋放後的變量才能夠被其餘線程鎖定。

  • read(讀取):做用於主內存的變量,它把一個變量的值從主內存傳輸到線程的本地內存中,以便隨後的load動做使用。

  • load(載入):做用於本地內存的變量,它把read操做從主內存中獲得變量值放入本地內存的變量副本中。

  • use(使用):做用於本地內存的變量,它把本地內存中一個變量的值傳遞給執行引擎,每當虛擬機遇到一個須要使用到變量的值的字節碼指令時將會執行這個操做。

  • assign(賦值):做用於本地內存的變量,它把一個從執行引擎接收到的值賦給本地內存的變量,每當虛擬機遇到一個給變量賦值的字節碼指令時執行這個操做。

  • store(存儲):做用於本地內存的變量,它把本地內存中的一個變量的值傳送到主內存中,以便隨後的write操做使用。

  • write(寫入):做用於主內存的變量,它把store操做從本地內存中提到的變量的值放入到主內存的變量中。

若是要把一個變量從主內存模型複製到本地內存,那就要順序的執行read和load操做,若是要把變量從本地內存同步回主內存,就要順序的執行store和write操做。注意,Java內存模型只要求上述兩個操做必須按順序執行,而沒有保證是連續執行。也就是說read與load之間、store與write之間是可插入其餘指令的,如對主內存中的變量a、b進行訪問時,一種可能出現的順序是read a read b、load b、load a。

內存屏障

內存屏障是一組處理器指令(前面的8個操做指令),用於實現對內存操做的順序限制。包括LoadLoad, LoadStore, StoreLoad, StoreStore共4種內存屏障。內存屏障存在的意義是什麼呢?它是在Java編譯器生成指令序列的適當位置插入內存屏障指令來禁止特定類型的處理器重排序,從而讓程序按咱們預想的流程去執行,內存屏障是與相應的內存重排序相對應的。JMM把內存屏障指令分爲4類

enter p_w_picpath description here

StoreLoad Barriers是一個「全能型 」的屏障,它同時具備其餘3個屏障的效果。如今的多數處理器大多支持該屏障(其餘類型的屏障不必定被全部處理器支持)。執行該屏障開銷會很昂貴,由於當前處理器一般要把寫緩衝區中的數據所有刷新到內存中。

數據依賴性

若是兩個操做訪問同一個變量,且這兩個操做中有一個爲寫操做,此時這兩個操做之間就存在數據依賴性。數據依賴性分3種類型:寫後讀、寫後寫、讀後寫。這3種狀況,只要重排序兩個操做的執行順序,程序的執行結果就會被改變。編譯器和處理器可能對操做進行重排序。而它們進行重排序時,會遵照數據依賴性,不會改變數據依賴關係的兩個操做的執行順序。

這裏所說的數據依賴性僅針對單個處理器中執行的指令序列和單個線程中執行的操做,不一樣處理器之間和不一樣線程之間的數據依賴性不被編譯器和處理器考慮。

順序一致性內存模型

順序一致性內存模型是一個理論參考模型,在設計的時候,處理器的內存模型和編程語言的內存模型都會以順序一致性內存模型做爲參照。它有兩個特性:

  • 一個線程中的全部操做必須按照程序的順序來執行

  • (無論程序是否同步)全部線程都只能看到一個單一的操做執行順序。在順序一致性的內存模型中,每一個操做必須原子執行而且馬上對全部線程可見。

從順序一致性模型中,咱們能夠知道程序全部操做徹底按照程序的順序串行執行。而在JMM中,臨界區內的代碼能夠重排序(但JMM不容許臨界區內的代碼「逸出」到臨界區外,那樣就破壞監視器的語義)。JMM會在退出臨界區和進入臨界區這兩個關鍵時間點作一些特別處理,使得線程在這兩個時間點具備與順序一致性模型相同的內存視圖。雖然線程A在臨界區內作了重排序,但因爲監視器互斥執行的特性,這裏的線程B根本沒法「觀察」到線程A在臨界區內的重排序。這種重排序既提升了執行效率,又沒有改變程序的執行結果。像前面單例示例的類初始化解決方案就是採用了這個思想。

as-if-serial

as-if-serial的意思是無論怎麼重排序,(單線程)程序的執行結果不能改變。編譯器、runtime和處理器都必須遵照as-if-serial語義。爲了遵照as-if-serial語義,編譯器和處理器不會對存在數據依賴關係的操做作重排序。

as-if-serial語義把單線程程序保護了起來,遵照as-if-serial語義的編譯器、runtime和處理器共同爲編寫單線程程序的程序員建立了一個幻覺:單線程程序是按程序的順序來執行的。as-if-serial語義使單線程程序員無需擔憂重排序會干擾他們,也無需擔憂內存可見性問題。

happens-before

happens-before是JMM最核心的概念。從JDK5開始,Java使用新的JSR-133內存模型,JSR-133 使用happens-before的概念闡述操做之間的內存可見性,若是一個操做執行的結果須要對另外一個操做可見,那麼這兩個操做之間必須存在happens-before關係。

happens-before規則以下:

  • 程序次序法則:線程中的每一個動做 A 都 happens-before 於該線程中的每個動做 B,其中,在程序中,全部的動做 B 都出如今動做 A 以後。(注:此法則只是要求遵循 as-if-serial語義)

  • 監視器鎖法則:對一個監視器鎖的解鎖 happens-before 於每個後續對同一監視器鎖的加鎖。(顯式鎖的加鎖和解鎖有着與內置鎖,即監視器鎖相同的存儲語意。)

  • volatile變量法則:對 volatile 域的寫入操做 happens-before 於每個後續對同一域的讀操做。(原子變量的讀寫操做有着與 volatile 變量相同的語意。)(volatile變量具備可見性和讀寫原子性。)

  • 線程啓動法則:在一個線程裏,對 Thread.start 的調用會 happens-before 於每個啓動線程中的動做。 線程終止法則:線程中的任何動做都 happens-before 於其餘線程檢測到這個線程已終結,或者從 Thread.join 方法調用中成功返回,或者 Thread.isAlive 方法返回false。

  • 中斷法則法則:一個線程調用另外一個線程的 interrupt 方法 happens-before 於被中斷線程發現中斷(經過拋出InterruptedException, 或者調用 isInterrupted 方法和 interrupted 方法)。

  • 終結法則:一個對象的構造函數的結束 happens-before 於這個對象 finalizer 開始。

  • 傳遞性:若是 A happens-before 於 B,且 B happens-before 於 C,則 A happens-before 於 C。

happens-before與JMM的關係以下圖所示

enter p_w_picpath description here

as-if-serial語義和happens-before本質上同樣,參考順序一致性內存模型的理論,在不改變程序執行結果的前提下,給編譯器和處理器以最大的自由度,提升並行度。

重排序

終於談到咱們反覆說起的重排序了,重排序是指編譯器和處理器爲了優化程序性能而對指令序列進行從新排序的一種手段。重排序分3種類型。

  • 編譯器優化的重排序。編譯器在不改變單線程程序語義(as-if-serial )的前提下,能夠從新安排語句的執行順序。

  • 指令級並行的重排序。現代處理器採用了指令級並行技術(Instruction Level Parallelism,ILP)來將多條指令重疊執行。若是不存在數據依賴性,處理器能夠改變語句對機器指令的執行順序。

  • 內存系統的重排序。因爲處理器使用緩存和讀/寫緩衝區,這使得加載和存儲操做看上去多是在亂序執行。

從Java源代碼到最終實際執行的指令序列,會分別經歷下面3種重排序

enter p_w_picpath description here

上述的1屬於編譯器重排序,2和3屬於處理器重排序。這些重排序可能會致使多線程程序出現內存可見性問題。對於編譯器,JMM的編譯器重排序規則會禁止特定類型的編譯器重排序(不是全部的編譯器重排序都要禁止)。對於處理器重排序,JMM的處理器重排序規則會要求Java編譯器在生成指令序列時,插入特定類型的內存屏障指令,經過內存屏障指令來禁止特定類型的處理器重排序。

JMM屬於語言級的內存模型,它確保在不一樣的編譯器和不一樣的處理器平臺之上,經過禁止特定類型的編譯器重排序和處理器重排序,爲程序員提供一致的內存可見性保證。

從JMM設計者的角度來講,在設計JMM時,須要考慮兩個關鍵因素:

  • 程序員對內存模型的使用。程序員但願內存模型易於理解,易於編程。程序員但願基於一個強內存模型(程序儘量的順序執行)來編寫代碼。

  • 編譯器和處理器對內存模型的實現。編譯器和處理器但願內存模型對它們的束縛越少越好,這樣它們就能夠作儘量多的優化(對程序重排序,作儘量多的併發)來提升性能。編譯器和處理器但願實現一個弱內存模型。

JMM設計就須要在這二者之間做出協調。JMM對程序採起了不一樣的策略:

  • 對於會改變程序執行結果的重排序,JMM要求編譯器和處理器必須禁止這種重排序。

  • 對於不會改變程序執行結果的重排序,JMM對編譯器和處理器不做要求(JMM容許這種重排序)。

介紹完了這幾個基本概念,咱們不難推斷出JMM是圍繞着在併發過程當中如何處理原子性、可見性和有序性這三個特徵來創建的:

  • 原子性:由Java內存模型來直接保證的原子性操做就是咱們前面介紹的8個原子操做指令,其中lock(lock指令實際在處理器上原子操做體現對總線加鎖或對緩存加鎖)和unlock指令操做JVM並未直接開放給用戶使用,可是卻提供了更高層次的字節碼指令monitorenter和monitorexit來隱式使用這兩個操做,這兩個字節碼指令反映到Java代碼中就是同步塊——synchronize關鍵字,所以在synchronized塊之間的操做也具有原子性。除了synchronize,在Java中另外一個實現原子操做的重要方式是自旋CAS,它是利用處理器提供的cmpxchg指令實現的。至於自旋CAS後面J.U.C中會詳細介紹,它和volatile是整個J.U.C底層實現的核心。

  • 可見性:可見性是指一個線程修改了共享變量的值,其餘線程可以當即得知這個修改。而咱們上文談的happens-before原則禁止某些處理器和編譯器的重排序,來保證了JMM的可見性。而體如今程序上,實現可見性的關鍵字包含了volatile、synchronize和final。

  • 有序性:談到有序性就涉及到前面說的重排序和順序一致性內存模型。咱們也都知道了as-if-serial是針對單線程程序有序的,即便存在重排序,可是最終程序結果仍是不變的,而多線程程序的有序性則體如今JMM經過插入內存屏障指令,禁止了特定類型處理器的重排序。經過前面8個操做指令和happens-before原則介紹,也不難推斷出,volatile和synchronized兩個關鍵字來保證線程之間的有序性,volatile自己就包含了禁止指令重排序的語義,而synchronized則是由監視器法則得到。

J.U.C

談完了JMM,那麼Java相關類庫是如何實現的呢?這裏就談談J.U.C( java.util.concurrent),先來張J.U.C的思惟導圖

enter p_w_picpath description here

不難看出,J.U.C由atomic、locks、tools、collections、Executor這五部分組成。它們的實現基於volatile的讀寫和CAS所具備的volatile讀和寫。AQS(AbstractQueuedSynchronizer,隊列同步器)、非阻塞數據結構和原子變量類,這些J.U.C中的基礎類都是使用了這種模式實現的,而J.U.C中的高層類又依賴於這些基礎類來實現的。從總體上看,J.U.C的實現示意圖以下

enter p_w_picpath description here

也許你對volatile和CAS的底層實現原理不是很瞭解,這裏先這裏先簡單介紹下它們的底層實現

volatile

Java語言規範第三版對volatile的定義爲:Java編程語言容許線程訪問共享變量,爲了確保共享變量能被準確和一致性的更新,線程應該確保經過排他鎖單獨得到這個變量。若是一個字段被聲明爲volatile,Java內存模型確保這個全部線程看到這個值的變量是一致的。而volatile是如何來保證可見性的呢?若是對聲明瞭volatile的變量進行寫操做,JVM就會向處理器發送一條Lock前綴的指令,將這個變量所在緩存行的數據寫回到系統內存(Lock指令會在聲言該信號期間鎖總線/緩存,這樣就獨佔了系統內存)。可是,就算是寫回到內存,若是其餘處理器緩存的值仍是舊的,再執行計算操做就會有問題。因此,在多處理器下,爲了保證各個處理器的緩存是一致的,就會實現緩存一致性協議,每一個處理器經過嗅探在總線(注意處理器不直接跟系統內存交互,而是經過總線)上傳播的數據來檢查本身緩存的值是否是過時了,當處理器發現直接緩存行對應的內存地址被修改,就會將當前處理器的緩存行設置成無效狀態,當處理器對這個數據進行修改操做的時候,會從新從系統內存中把數據讀處處理器緩存裏。

CAS

CAS其實應用挺普遍的,咱們經常聽到的悲觀鎖樂觀鎖的概念,樂觀鎖(無鎖)指的就是CAS。這裏只是簡單說下在併發的應用,所謂的樂觀併發策略,通俗的說,就是先進性操做,若是沒有其餘線程爭用共享數據,那操做就成功了,若是共享數據有爭用,產生了衝突,那就採起其餘的補償措施(最多見的補償措施就是不斷重試,治到成功爲止,這裏其實也就是自旋CAS的概念),這種樂觀的併發策略的許多實現都不須要把線程掛起,所以這種操做也被稱爲非阻塞同步。而CAS這種樂觀併發策略操做和衝突檢測這兩個步驟具有的原子性,是靠什麼保證的呢?硬件,硬件保證了一個從語義上看起來須要屢次操做的行爲只經過一條處理器指令就能完成。

也許你會存在疑問,爲何這種無鎖的方案通常會比直接加鎖效率更高呢?這裏其實涉及到線程的實現和線程的狀態轉換。實現線程主要有三種方式:使用內核線程實現、使用用戶線程實現和使用用戶線程加輕量級進程混合實現。而Java的線程實現則依賴於平臺使用的線程模型。至於狀態轉換,Java定義了6種線程狀態,在任意一個時間點,一個線程只能有且只有其中的一種狀態,這6種狀態分別是:新建、運行、無限期等待、限期等待、阻塞、結束。 Java的線程是映射到操做系統的原生線程之上的,若是要阻塞或喚醒一個線程,都須要操做系統來幫忙完成,這就須要從用戶態轉換到核心態中,所以狀態轉換須要耗費不少的處理器時間。對於簡單的同步塊(被synchronized修飾的方法),狀態轉換消耗的時間可能比用戶代碼執行的時間還要長。因此出現了這種優化方案,在操做系統阻塞線程之間引入一段自旋過程或一直自旋直到成功爲止。避免頻繁的切入到核心態之中。

可是這種方案其實也並不完美,在這裏就說下CAS實現原子操做的三大問題

  • ABA問題。由於CAS須要在操做值的時候,檢查值有沒有變化,若是沒有發生變化則更新,可是若是一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有變化,可是實際上發生變化了。ABA解決的思路是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加1。JDK的Atomic包裏提供了一個類AtomicStampedReference來解決ABA問題。不過目前來講這個類比較「雞肋」,大部分狀況下ABA問題不會影響程序併發的正確性,若是須要解決ABA問題,改用原來的互斥同步可能會比原子類更高效。

  • 循環時間長開銷大。自旋CAS若是長時間不成功,會給CPU帶來很是大的執行開銷。因此說若是是長時間佔用鎖執行的程序,這種方案並不適用於此。

  • 只能保證一個共享變量的原子操做。當對一個共享變量執行操做時,咱們可使用自旋CAS來保證原子性,可是對多個共享變量的操做時,自旋CAS就沒法保證操做的原子性,這個時候能夠用鎖。

談完了這兩個概念,下面咱們就來逐個分析這五部分的具體源碼實現

atomic

atomic包的原子操做類提供了一種簡單、性能高效、線程安全操做一個變量的方式。atomic包裏一共13個類,屬於4種類型的原子更新方式,分別是原子更新基本類型、原子更新數組、原子更新引用、原子更新屬性。atomic包裏的類基本使用Unsafe實現的包裝類。

下面經過一個簡單的CAS方式實現計數器(一個線程安全的計數器方法safeCount和一個非線程安全的計數器方法count)的示例來講下

public class CASTest {    public static void main(String[] args){
        final Counter cas=new Counter();
        List ts=new ArrayList(600);        long start=System.currentTimeMillis();        for(int j=0;j<100;j++){
            Thread t=new Thread(new Runnable() {
                @Override                public void run() {                    for(int i=0;i<10000;i++){
                        cas.count();
                        cas.safeCount();
                    }
                }
            });
            ts.add(t);
        }        for(Thread t:ts){
            t.start();
        }        for(Thread t:ts){            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(cas.i);
        System.out.println(cas.atomicI.get());
        System.out.println(System.currentTimeMillis()-start);
    }

}public class Counter {    public AtomicInteger atomicI=new AtomicInteger(0);    public int i=0;    /**
    * 使用CAS實現線程安全計數器
    */
    public void safeCount(){        for(;;){            int i=atomicI.get();
            boolean suc=atomicI.compareAndSet(i,++i);            if(suc){                break;
            }
        }
    }    /**
    * 非線程安全計數器
    */
    public void count(){
        i++;
    }

}

safeCount()方法的代碼塊實際上是getandIncrement()方法的實現,源碼for循環體第一步優先取得atomicI裏存儲的數值,第二步對atomicI的當前數值進行加1操做,關鍵的第三步調用compareAndSet()方法來進行原子更新操做,該方法先檢查當前數值是否等於current,等於意味着atomicI的值沒有被其餘線程修改過,則將atomicI的當前數值更新成next的值,若是不等compareAndSet()方法會返回false,程序則進入for循環從新進行compareAndSet()方法操做進行不斷嘗試直到成功爲止。在這裏咱們跟蹤下compareAndSet()方法以下

public final boolean compareAndSet(int expect, int update) {    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

從上面源碼咱們發現是使用Unsafe實現的,其實atomic裏的類基本都是使用Unsafe實現的。咱們再回到這個本地方法調用,這個本地方法在openjdk中依次調用c++代碼爲unsafe.cpp、atomic.app和atomic_windows_x86.inline.hpp。關於本地方法實現的源代碼這裏就不貼出來了,其實大致上是程序會根據當前處理器的類型來決定是否爲cmpxchg指令添加lock前綴。若是程序是在多處理器上運行,就爲cmpxchg指令加上lock前綴(Lock Cmpxchg)。反之,若是程序是在單處理器上運行,就省略lock前綴(單處理器自身就會維護單處理器內的順序一致性,不須要lock前綴提供的內存屏障效果)。

locks

鎖是用來控制多個線程訪問共享資源的形式,Java SE 5以後,J.U.C中新增了locks來實現鎖功能,它提供了與synchronized關鍵字相似的同步功能。只是在使用時須要顯示的獲取和釋放鎖。雖然它缺乏了隱式獲取和釋放鎖的便捷性,可是卻擁有了鎖獲取和釋放的可操做性、可中斷的獲取鎖及超時獲取鎖等多種synchronized關鍵字不具有的同步特性。

locks在這咱們只介紹下核心的AQS(AbstractQueuedSynchronizer,隊列同步器),AQS是用來構建鎖或者其餘同步組件的基礎框架,它使用一個用volatile修飾的int成員變量表示同步狀態。經過內置的FIFO隊列來完成資源獲取線程的排隊工做。同步器的主要使用方式是繼承,子類經過繼承同步器並實現它的抽象方法來管理同步狀態,在抽象方法的實現過程免不了要對同步狀態進行更改,這時候就會使用到AQS提供的3個方法:getState()、setState()和compareAndSetState()來進行操做,這是由於它們可以保證狀態的改變是原子性的。爲何這麼設計呢?由於鎖是面向使用者的,它定義了使用者與鎖交互的接口,隱藏了實現細節,而AQS面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待與喚醒等底層操做。鎖和AQS很好的隔離了使用者和實現者鎖關注的領域。

如今咱們就自定義一個獨佔鎖來詳細解釋下AQS的實現機制

public class Mutex implements Lock {    private static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -4387327721959839431L;        protected boolean isHeldExclusively() {            return getState() == 1;
        }        public boolean tryAcquire(int acquires) {            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());                return true;
            }            return false;
        }        protected boolean tryRelease(int releases) {            assert releases == 1; // Otherwise unused
            if (getState() == 0)                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);            return true;
        }        Condition newCondition() {            return new ConditionObject();
        }
    }    private final Sync sync = new Sync();    public void lock() {
        sync.acquire(1);
    }    public boolean tryLock() {        return sync.tryAcquire(1);
    }    public void unlock() {
        sync.release(1);
    }    public Condition newCondition() {        return sync.newCondition();
    }    public boolean isLocked() {        return sync.isHeldExclusively();
    }    public boolean hasQueuedThreads() {        return sync.hasQueuedThreads();
    }    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

實現自定義組件的時候,咱們能夠看到,AQS可重寫的方法是tryAcquire()——獨佔式獲取同步狀態、tryRelease()——獨佔式釋放同步狀態、tryAcquireShared()——共享式獲取同步狀態、tryReleaseShared ()——共享式釋放同步狀態、isHeldExclusively()——是否被當前線程所獨佔。這個示例中,獨佔鎖Mutex是一個自定義同步組件,它在同一時刻只容許一個線程佔有鎖。Mutex中定義了一個靜態內部類,該內部類繼承了同步器並實現了獨佔式獲取和釋放同步狀態。在tryAcquire()中,若是通過CAS設置成功(同步狀態設置爲1),則表示獲取了同步狀態,而在tryRelease()中,只是將同步狀態重置爲0。接着咱們對比一下重入鎖(ReentrantLock)的源碼實現

public class ReentrantLock implements Lock, java.io.Serializable {    private static final long serialVersionUID = 7373984872572414699L;    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -5179523762034025860L;        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();        /**
         * Performs non-fair tryLock.  tryAcquire is
         * implemented in subclasses, but both need nonfair
         * try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);                    return true;
                }
            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);                return true;
            }            return false;
        }        protected final boolean tryRelease(int releases) {            int c = getState() - releases;            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);            return free;
        }        protected final boolean isHeldExclusively() {            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }        final ConditionObject newCondition() {            return new ConditionObject();
        }        // Methods relayed from outer class

        final Thread getOwner() {            return getState() == 0 ? null : getExclusiveOwnerThread();
        }        final int getHoldCount() {            return isHeldExclusively() ? getState() : 0;
        }        final boolean isLocked() {            return getState() != 0;
        }        /**
         * Reconstitutes this lock instance from a stream.
         * @param s the stream
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {    //todo sth...
    }    //todo sth...}

重入鎖分公平鎖和不公平鎖,默認使用的是不公平鎖,在這咱們看到實現重入鎖大致上跟咱們剛纔自定義的獨佔鎖差很少,可是有什麼區別呢?咱們看看重入鎖nonfairTryAcquire()方法實現:首先獲取同步狀態(默認是0),若是是0的話,CAS設置同步狀態,非0的話則判斷當前線程是否已佔有鎖,若是是的話,則偏向更新同步狀態。從這裏咱們不難推斷出重入鎖的概念,同一個線程能夠屢次得到同一把鎖,在釋放的時候也必須釋放相同次數的鎖。經過對比相信你們對自定義一個鎖有了一個初步的概念,也許你存在疑問咱們重寫的這幾個方法在AQS哪地方用呢?如今咱們來繼續往下跟蹤,咱們深刻跟蹤下剛纔自定義獨佔鎖lock()方法裏面acquire()的實現

public final void acquire(int arg) {    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這個方法在AQS類裏面,看到裏面的tryAcquire(arg)你們也就明白了,tryAcquire(arg)方法獲取同步狀態,後面acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法就是說的節點構造、加入同步隊列及在同步隊列中自旋等待的AQS沒暴露給咱們的相關操做。大致的流程就是首先調用自定義同步器實現的tryAcquire()方法,該方法保證線程安全的獲取同步狀態,若是獲取同步狀態失敗,則構造同步節點(獨佔式Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態)並經過addWaiter()方法將該節點加入到同步隊列的尾部,最後調用acquireQueued()方法,使得該節點以「死循環」的方式獲取同步狀態。若是獲取不到則阻塞節點中的線程,而被阻塞線程的喚醒主要靠前驅節點的出隊或阻塞線程被中斷來實現。也許你仍是不明白剛纔所說的,那麼咱們繼續跟蹤下addWaiter()方法的實現

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;    if (pred != null) {
        node.prev = pred;        if (compareAndSetTail(pred, node)) {
            pred.next = node;            return node;
        }
    }
    enq(node);    return node;
}private Node enq(final Node node) {    for (;;) {
        Node t = tail;        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;            if (compareAndSetTail(t, node)) {
                t.next = node;                return t;
            }
        }
    }
}

上面的代碼經過使用compareAndSetTail()方法來確保節點可以被線程安全添加。在enq()方法中,同步器經過「死循環」來確保節點的正確添加,在」死循環「中只有經過CAS將節點設置成爲尾節點以後,當前線程纔可以從該方法返回,不然,當前線程不斷地嘗試重試設置。

在節點進入同步隊列以後,發生了什麼呢?如今咱們繼續跟蹤下acquireQueued()方法

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            final Node p = node.predecessor();            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;                return interrupted;
            }            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {        if (failed)
            cancelAcquire(node);
    }
}

從上面的代碼咱們不難看出,節點進入同步隊列以後,就進入了一個自旋的過程,每一個節點(或者說每一個線程)都在自省的觀察,當條件知足時(本身的前驅節點是頭節點就進行CAS設置同步狀態)就得到同步狀態,而後就能夠從自旋的過程當中退出,不然依舊在這個自旋的過程當中。

collections

從前面的思惟導圖咱們能夠看到併發容器包括鏈表、隊列、HashMap等.它們都是線程安全的。

  • ConcurrentHashMap : 一個高效的線程安全的HashMap。

  • CopyOnWriteArrayList : 在讀多寫少的場景中,性能很是好,遠遠高於vector。

  • ConcurrentLinkedQueue : 高效併發隊列,使用鏈表實現,能夠當作線程安全的LinkedList。

  • BlockingQueue : 一個接口,JDK內部經過鏈表,數組等方式實現了這個接口,表示阻塞隊列,很是適合用做數據共享 。

  • ConcurrentSkipListMap : 跳錶的實現,這是一個Map,使用跳錶數據結構進行快速查找 。

另外Collections工具類能夠幫助咱們將任意集合包裝成線程安全的集合。在這裏重點說下ConcurrentHashMap和BlockingQueue這兩個併發容器。

咱們都知道HashMap線程不安全的,而咱們能夠經過Collections.synchronizedMap(new HashMap<>())來包裝一個線程安全的HashMap或者使用線程安全的HashTable,可是它們的效率都不是很好,這時候咱們就有了ConcurrentHashMap。爲何ConcurrentHashMap高效且線程安全呢?其實它使用了鎖分段技術來提升了併發的訪問率。假如容器裏有多把鎖,每一把鎖用於鎖容器的一部分數據,那麼當多線程訪問容器裏不一樣數據段的數據時,線程間就不會存在鎖競爭,從而能夠有效地提升併發訪問效率,這就是鎖分段技術。首先將數據分紅一段段的存儲,而後給每段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問。而既然數據被分紅了多個段,線程如何定位要訪問的段的數據呢?這裏實際上是經過散列算法來定位的。

如今來談談阻塞隊列,阻塞隊列其實跟後面要談的線程池息息相關的,JDK7提供了7個阻塞隊列,分別是

  • ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。

  • LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。

  • PriorityBlockingQueue :一個支持優先級排序的×××阻塞隊列。

  • DelayQueue:一個使用優先級隊列實現的×××阻塞隊列。

  • SynchronousQueue:一個不存儲元素的阻塞隊列。

  • LinkedTransferQueue:一個由鏈表結構組成的×××阻塞隊列。

  • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

若是隊列是空的,消費者會一直等待,當生產者添加元素時候,消費者是如何知道當前隊列有元素的呢?若是讓你來設計阻塞隊列你會如何設計,讓生產者和消費者可以高效率的進行通信呢?讓咱們先來看看JDK是如何實現的。

使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。經過查看JDK源碼發現ArrayBlockingQueue使用了Condition來實現,代碼以下:

private final Condition notFull;private final Condition notEmpty;    public ArrayBlockingQueue(int capacity, boolean fair) {        //省略其餘代碼
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {            lock.unlock();
        }
    }    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == 0)
                notEmpty.await();            return extract();
        } finally {            lock.unlock();
        }
    }    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

當咱們往隊列裏插入一個元素時,若是隊列不可用,阻塞生產者主要經過LockSupport.park(this)來實現

public final void await() throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();
    Node node = addConditionWaiter();    int savedState = fullyRelease(node);    int interruptMode = 0;    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;
    }    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

繼續進入源碼,發現調用setBlocker先保存下將要阻塞的線程,而後調用unsafe.park阻塞當前線程。

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);    unsafe.park(false, 0L);
    setBlocker(t, null);
}

unsafe.park是個native方法,代碼以下:

public native void park(boolean isAbsolute, long time);

park這個方法會阻塞當前線程,只有如下四種狀況中的一種發生時,該方法纔會返回。

  • 與park對應的unpark執行或已經執行時。注意:已經執行是指unpark先執行,而後再執行的park。

  • 線程被中斷時。

  • 若是參數中的time不是零,等待了指定的毫秒數時。

  • 發生異常現象時。這些異常事先沒法肯定。

咱們繼續看一下JVM是如何實現park方法的,park在不一樣的操做系統使用不一樣的方式實現,在linux下是使用的是系統方法pthread_cond_wait實現。實現代碼在JVM源碼路徑src/os/linux/vm/os_linux.cpp裏的 os::PlatformEvent::park方法,代碼以下:

void os::PlatformEvent::park() {            int v ;            for (;;) {
                v = _Event ;            if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
            }
            guarantee (v >= 0, "invariant") ;            if (v == 0) {            // Do this the hard way by blocking ...
            int status = pthread_mutex_lock(_mutex);
            assert_status(status == 0, status, "mutex_lock");
            guarantee (_nParked == 0, "invariant") ;
            ++ _nParked ;           while (_Event < 0) {          
           status = pthread_cond_wait(_cond, _mutex);        
           // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...           
           // Treat this the same as if the wait was interrupted         
            if (status == ETIME) { status = EINTR; }        
            assert_status(status == 0 || status == EINTR, status, "cond_wait");        
           }    
           -- _nParked ;          
          // In theory we could move the ST of 0 into _Event past the unlock(),       
          // but then we'd need a MEMBAR after the ST.       
          _Event = 0 ;     
         status = pthread_mutex_unlock(_mutex);    
         assert_status(status == 0, status, "mutex_unlock");       
         }        
          guarantee (_Event >= 0, "invariant") ;
        }
    }

pthread_cond_wait是一個多線程的條件變量函數,cond是condition的縮寫,字面意思能夠理解爲線程在等待一個條件發生,這個條件是一個全局變量。這個方法接收兩個參數,一個共享變量_cond,一個互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal實現的。park 在windows下則是使用WaitForSingleObject實現的。

當隊列滿時,生產者往阻塞隊列裏插入一個元素,生產者線程會進入WAITING (parking)狀態。

executor

Executor框架提供了各類類型的線程池,不一樣的線程池應用了前面介紹的不一樣的堵塞隊列

enter p_w_picpath description here

Executor框架最核心的類是ThreadPoolExecutor,它是線程池的實現類。 對於核心的幾個線程池,不管是newFixedThreadPool()、newSingleThreadExecutor()仍是newCacheThreadPool()方法,雖然看起來建立的線程具備徹底不一樣的功能特色,但其內部均使用了ThreadPoolExecutor實現

public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,            0L, TimeUnit.MILLISECONDS,            new LinkedBlockingQueue());
}public static ExecutorService newSingleThreadExecutor() {    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,                    0L, TimeUnit.MILLISECONDS,                    new LinkedBlockingQueue()));
}public static ExecutorService newCachedThreadPool() {    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,            60L, TimeUnit.SECONDS,            new SynchronousQueue());
}
  • newFixedThreadPool()方法的實現,它返回一個corePoolSize和maximumPoolSize同樣的,並使用了LinkedBlockingQueue任務隊列(×××隊列)的線程池。當任務提交很是頻繁時,該隊列可能迅速膨脹,從而系統資源耗盡。

  • newSingleThreadExecutor()返回單線程線程池,是newFixedThreadPool()方法的退化,只是簡單的將線程池數量設置爲1。

  • newCachedThreadPool()方法返回corePoolSize爲0而maximumPoolSize無窮大的線程池,這意味着沒有任務的時候線程池內沒有現場,而當任務提交時,該線程池使用空閒線程執行任務,若無空閒則將任務加入SynchronousQueue隊列,而SynchronousQueue隊列是直接提交隊列,它老是破事線程池增長新的線程來執行任務。當任務執行完後因爲corePoolSize爲0,所以空閒線程在指定時間內(60s)被回收。對於newCachedThreadPool(),若是有大量任務提交,而任務又不那麼快執行時,那麼系統變回開啓等量的線程處理,這樣作法可能會很快耗盡系統的資源,由於它會增長無窮大數量的線程。

由以上線程池的實現能夠看到,它們都只是ThreadPoolExecutor類的封裝。咱們看下ThreadPoolExecutor最重要的構造函數:

public ThreadPoolExecutor(            //核心線程池,指定了線程池中的線程數量
            int corePoolSize,            //基本線程池,指定了線程池中的最大線程數量
            int maximumPoolSize,            //當前線程池數量超過corePoolSize時,多餘的空閒線程的存活時間,即屢次時間內會被銷燬。
            long keepAliveTime,            //keepAliveTime的單位
            TimeUnit unit,            //任務隊列,被提交但還沒有被執行的任務。
            BlockingQueue workQueue,            //線程工廠,用於建立線程,通常用默認的便可
            ThreadFactory threadFactory,            //拒絕策略,當任務太多來不及處理,如何拒絕任務。
            RejectedExecutionHandler handler)

ThreadPoolExecutor的任務調度邏輯以下

enter p_w_picpath description here

從上圖咱們能夠看出,當提交一個新任務到線程池時,線程池的處理流程以下:

  • 首先線程池判斷基本線程池是否已滿,若是沒滿,建立一個工做線程來執行任務。滿了,則進入下個流程。

  • 其次線程池判斷工做隊列是否已滿,若是沒滿,則將新提交的任務存儲在工做隊列裏。滿了,則進入下個流程。

  • 最後線程池判斷整個線程池是否已滿,若是沒滿,則建立一個新的工做線程來執行任務,滿了,則交給飽和策略來處理這個任務。

下面咱們來看看ThreadPoolExecutor核心調度代碼

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        int c = ctl.get();        /**
        * workerCountOf(c)獲取當前線程池線程總數
        * 當前線程數小於corePoolSize核心線程數時,會將任務經過addWorker(command, true)方法直接調度執行。
        * 不然進入下個if,將任務加入等待隊列
        **/
        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;
            c = ctl.get();
        }        /**
        * workQueue.offer(command) 將任務加入等待隊列。
        * 若是加入失敗(好比有界隊列達到上限或者使用了synchronousQueue)則會執行else。
        *
        **/
        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))
                reject(command);            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }        /**
        * addWorker(command, false)直接交給線程池,
        * 若是當前線程已達到maximumPoolSize,則提交失敗執行reject()拒絕策略。
        **/
        else if (!addWorker(command, false))            reject(command);
    }

從上面的源碼咱們能夠知道execute的執行步驟:

  • 若是當前運行的線程少於corePoolSize,則建立新線程來執行任務(注意,執行這一步驟須要獲取全局鎖)。

  • 若是運行的線程等於或多於corePoolSize,則將任務加入到BlockingQueue。

  • 若是沒法將任務假如BlockingQueue(隊列已滿),則建立新的線程來處理任務(注意,執行這一步驟須要獲取全局鎖)。

  • 若是建立新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor採起上述步驟的整體設計思路,是爲了在執行execute()方法時,儘量的避免獲取全局鎖(那將會是一個嚴重的 可伸縮瓶頸)。在ThreadPoolExecutor完成預熱以後(當前運行的線程數大於等於corePoolSize),幾乎全部的execute()方法調用都是執行步驟2,而步驟2不須要獲取全局鎖。

參考閱讀

本文部份內容參考自《Java併發編程的藝術》、《深刻理解Java虛擬機(第2版)》、《實戰Java高併發程序設計》、《深刻Java內存模型》、《Java併發編程實踐》,感興趣的可自行查閱。

相關文章
相關標籤/搜索