Java併發編程藝術

volatile域內存知識


如何減小cpu上下文切換

  • 避免使用鎖:無鎖併發編程,多線程競爭鎖時,會引發上下問文切換,因此多線程處理時,能夠用一些辦法來避免使用鎖,如將數據的ID按照Hash算法取模分段,不一樣的線程處理不一樣段的數據
  • CAS算法:java的atomic包使用CAS算法來更新數據,而不須要加鎖
  • 使用最少線程:避免建立不須要的線程,好比任務不多,可是建立了不少線程來處理,這樣會形成大量線程都處於等待狀態
  • 協程:在單線程裏實現多任務的調度,並在單線程裏維持多個任務間的切換。

volatile和synchronized

若是volatile變量修飾符使用恰當的話,它比synchronized的使用和執行成本更低,由於它不會引發線程上下文的切換和調度。html

若是對聲明瞭volatile的變量進行寫操做,JVM就會向處理器發送一條Lock前綴的指令,將這個變量所在緩存行的數據寫回到系統內存java

每一個處理器經過嗅探在總線上傳播的數據來檢查本身緩存的值是否是過時了,當處理器發現本身緩存行對應的內存地址被修改,就會將當前處理器的緩存行設置成無效狀態,當處理器對這個數據進行修改操做的時候,會從新從系統內存中把數據讀處處理器緩存裏。程序員

volatile實現原則算法

  • Lock前綴指令會引發處理器緩存回寫到內存。Lock前綴指令致使在執行指令期間,聲言處理器的LOCK#信號。在多處理器環境中,LOCK#信號確保在聲言該信號期間,處理器能夠獨佔任何共享內存。可是,在最近的處理器裏,LOCK#信號通常不鎖總線,而是鎖緩存,畢竟鎖總線開銷的比較大。
  • 一個處理器的緩存回寫到內存會致使其餘處理器的緩存無效。IA-32處理器和Intel 64處理器使用MESI(修改、獨佔、共享、無效)控制協議去維護內部緩存和其餘處理器緩存的一致性

jdk 7追加字節優化性能編程

  • 將共享變量追加到64字節。一些處理器不支持部分填充緩存行,若是隊列頭節點和尾節點都不足64字節的話,處理器會將他們讀到同一個高速緩存行中,在多處理器下每一個處理器都會緩存一樣的頭、尾節點,當一個處理器試圖修改頭節點時,會將整個緩存行鎖定,那麼在緩存一致性機制的做用下,會致使其餘處理器不能訪問本身高速緩存中的尾節點,而隊列的入隊和出隊操做則須要不停修改頭節點和尾節點,因此在多處理器的狀況下將會嚴重影響到隊列的入隊和出隊效率。Doug lea使用追加到64字節的方式來填滿高速緩衝區的緩存行,避免頭節點和尾節點加載到同一個緩存行,使頭、尾節點在修改時不會互相鎖定。數組

  • 偏向鎖:當一個線程訪問同步塊並獲取鎖時,會在對象頭和棧幀中的鎖記錄裏存儲鎖偏向的線程ID,之後該線程在進入和退出同步塊時不須要進行CAS操做來加鎖和解鎖,只需簡單地測試一下對象頭的Mark Word
    裏是否存儲着指向當前線程的偏向鎖。若是測試成功,表示線程已經得到了鎖。若是測試失敗,則須要再測試一下Mark Word中偏向鎖的標識是否設置成1(表示當前是偏向鎖):若是沒有設置,則使用CAS競爭鎖;若是設置了,則嘗試使用CAS將對象頭的偏向鎖指向當前線程。緩存

  • 輕量級鎖:線程在執行同步塊以前,JVM會先在當前線程的棧楨中建立用於存儲鎖記錄的空間,並將對象頭中的Mark Word複製到鎖記錄中,官方稱爲Displaced Mark Word。而後線程嘗試使用CAS將對象頭中的Mark
    Word替換爲指向鎖記錄的指針。若是成功,當前線程得到鎖,若是失敗,表示其餘線程競爭鎖,當前線程便嘗試使用自旋來獲取鎖。安全

cpu如何保證原子性

總線鎖:線鎖就是使用處理器提供的一個LOCK #信號,當一個處理器在總線上輸出此信號時,其餘處理器的請求將被阻塞住,那麼該處理器能夠獨佔共享內存。微信

緩存鎖:指內存區域若是被緩存在處理器的緩存行中,而且在Lock操做期間被鎖定,那麼當它執行鎖操做回寫到內存時,處理器不在總線上聲言LOCK #信號,而是修改內部的內存地址,並容許它的緩存一致性機制來保證操做的原子性,由於緩存一致性機制會阻止同時修改由兩個以上處理器緩存的內存區域數據,當其餘處理器回寫已被鎖定的緩存行的數據時,會使緩存行無效。多線程

兩種狀況不會使用緩存鎖

  • 第一種狀況:當操做的數據不能被緩存在處理器內部,或操做的數據跨多個緩存行(cache line)時,則處理器會調用總線鎖定。
  • 第二種狀況:有些處理器不支持緩存鎖定。對於Intel 486和Pentium處理器,就算鎖定的內存區域在處理器的緩存行中也會調用總線鎖定。

CAS 原子操做的問題

ABA問題:可是若是一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,可是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加1,那麼A→B→A就會變成1A→2B→3A。

  • 解決辦法:從Java 1.5開始,JDK的Atomic包裏提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet
    方法的做用是首先檢查當前引用是否等於預期引用,而且檢查當前標誌是否等於預期標誌,若是所有相等,則以原子方式將該引用和該標誌的值設置爲給定的更新值。

循環時間長開銷大問題。自旋CAS若是長時間不成功,會給CPU帶來很是大的執行開銷。

只能保證一個共享變量的原子操做。還有一個取巧的辦法,就是把多個共享變量合併成一個共享變量來操做。好比,有兩個共享變量i=2,j=a,合併一下ij=2a,而後用CAS來操做ij。

使用鎖機制實現原子操做鎖機制保證了只有得到鎖的線程纔可以操做鎖定的內存區域。JVM內部實現了不少種鎖機制,有偏向鎖、輕量級鎖和互斥鎖。有意思的是除了偏向鎖,JVM實現鎖的方式都用了循環CAS,即當一個線程想進入同步塊的時候使用循環CAS的方式來獲取鎖,當它退出同步塊的時候使用循環CAS釋放鎖。

以何種機制來交換信息


指令重排序

在執行程序時,爲了提升性能,編譯器和處理器經常會對指令作重排序。重排序分3種類型

  1. 編譯器優化的重排序。編譯器在不改變單線程程序語義的前提下,能夠從新安排語句的執行順序。
  2. 指令級並行重排序。現代處理器採用了指令級並行技術(Instruction-Level Parallelism,ILP)來將多條指令重疊執行。若是不存在數據依賴性,處理器能夠改變語句對應機器指令的執行順序。
  3. 內存系統的重排序。因爲處理器使用緩存和讀/寫緩衝區,這使得加載和存儲操做看上去多是在亂序執行。

1屬於編譯器重排序,2和3屬於處理器重排序;

對於編譯器,JMM的編譯器重排序規則會禁止特定類型的編譯器重排序(不是全部的編譯器重排序都要禁止)。對於處理器重排序,JMM的處理器重排序規則會要求Java編譯器在生成指令序列時,插入特定類型的內存屏障(Memory Barriers,Intel稱之爲Memory Fence)指令,經過內存屏障指令來禁止特定類型的處理器重排序。

併發編程模型分類

經過以批處理的方式刷新寫緩衝區,以及合併寫緩衝區中對同一內存地址的屢次寫,減小對內存總線的佔用。雖然寫緩衝區有這麼多好處,但每一個處理器上的寫緩衝區,僅僅對它所在的處理器可見。這個特性會對內存操做的執行順序產生重要的影響:處理器對內存的讀/寫操做的執行順序,不必定與內存實際發生的讀/寫操做順序一致!

sparc-TSO和X86擁有相對較強的處理器內存模型,它們僅容許對寫-讀操做作重排序

StoreLoad Barriers是一個「全能型」的屏障,它同時具備其餘3個屏障的效果。
執行該屏障開銷會很昂貴,由於當前處理器一般要把寫緩衝區中的數據所有刷新到內存中(Buffer Fully Flush)。

happens-before

java使用新的JSR-133內存模型。在JMM中若是一個操做執行的結果須要對另外一個操做可見,那麼這兩個操做之間必需要在happens-before關係。
與程序員密切相關的happens-before規則以下。

  • 程序順序規則:一個線程中的每一個操做,happens-before於該線程中的任意後續操做。

  • 監視器鎖規則:對一個鎖的解鎖,happens-before於隨後對這個鎖的加鎖。

  • volatile變量規則:對一個volatile域的寫,happens- before於任意後續對這個volatile域的讀。

  • 傳遞性:若是A happens-before B,且B happens-before C,那麼Ahappens-before C。

happens-before理解

順序一致性模型

一個線程中的全部操做必須按照程序的順序來執行。

(無論程序是否同步)全部線程都只能看到一個單一的操做執行順序。在順序一致性內存模型中,每一個操做都必須原子執行且馬上對全部線程可見。

當多個線程併發執行時,圖中的開關裝置能把全部線程的全部內存讀/寫操做串行化(即在順序一致性模型中,全部操做之間具備全序關係)。

CPU總線事務

總線事務包括讀事務(Read Transaction)和寫事務(Write Transaction)。讀事務從內存傳送數據處處理器,寫事務從處理器傳送數據到內存,每一個事務會讀/寫內存中一個或多個物理上連續的字。
在一個處理器執行總線事務期間,總線會禁止其餘的處理器和I/O設備執行內存的讀/寫。

當JVM在這種處理器上運行時,可能會把一個64位long/double型變量的寫操做拆分爲兩個32位的寫操做來執行。這兩個32位的寫操做可能會被分配到不一樣的總線事務中執行,此時對這個64位變量的寫操做將不具備原子性。

從JSR -133內存模型開始(即從JDK5開始),僅僅只容許把一個64位long/double型變量的寫操做拆分爲兩個32位的寫操做來執行,任意的讀操做在JSR-133中都必須具備原子性(即任意讀操做必需要在單個讀事務中執行)。

volatile特色

可見性。對一個volatile變量的讀,老是能看到(任意線程)對這個volatile變量最後的寫入。

原子性:對任意單個volatile變量的讀/寫具備原子性,但相似於volatile++這種複合操做不具備原子性。

每個箭頭連接的兩個節點,表明了一個happens-before關係。黑色箭頭表示程序順序規則;橙色箭頭表示volatile規則;藍色箭頭表示組合這些規則後提供的happens-before保證。

A線程寫一個volatile變量後,B線程讀同一個volatile變量。A線程在寫volatile變量以前全部可見的共享變量(即寫以前的值都寫入到JMM中),在B線程讀同一個volatile變量後,將當即變得對B線程可見。


線程A寫一個volatile變量,隨後線程B讀這個volatile變量,這個過程實質上是線程A經過主內存向線程B發送消息。

volatile重排序規則表

  • 當第二個操做是volatile寫時,無論第一個操做是什麼,都不能重排序。這個規則確保volatile寫以前的操做不會被編譯器重排序到volatile寫以後。
  • 當第一個操做是volatile讀時,無論第二個操做是什麼,都不能重排序。這個規則確保volatile讀以後的操做不會被編譯器重排序到volatile讀以前。
  • 當第一個操做是volatile寫,第二個操做是volatile讀時,不能重排序。

JMM插入內存屏障來禁止特定類型的處理器重排序

  • 當第二個操做是volatile寫時,無論第一個操做是什麼,都不能重排序。這個規則確保volatile寫以前的操做不會被編譯器重排序到volatile寫以後。
  • 當第一個操做是volatile讀時,無論第二個操做是什麼,都不能重排序。這個規則確保volatile讀以後的操做不會被編譯器重排序到volatile讀以前。
  • 當第一個操做是volatile寫,第二個操做是volatile讀時,不能重排序。

StoreLoad屏障:一個寫線程寫volatile變量,多個讀線程讀同一個volatile變量。當讀線程的數量大大超過寫線程時,選擇在volatile寫以後插入StoreLoad屏障將帶來可觀的執行效率的提高。


ReentrantLock 中公平鎖和非公平鎖內存語義

公平鎖和非公平鎖釋放時,最後都要寫一個volatile變量state。

公平鎖獲取時,首先會去讀volatile變量。

非公平鎖獲取時,首先會用CAS更新volatile變量,這個操做同時具備volatile讀和volatile寫的內存語義。

concurrent包實現示意圖

final域內存知識

final域重排序規則

在構造函數內對一個final域的寫入,與隨後把這個被構造對象的引用賦值給一個引用變量,這兩個操做之間不能重排序。

初次讀一個包含final域的對象的引用,與隨後初次讀這個final域,這兩個操做之間不能重排序

假設一個線程A執行writer()方法,隨後另外一個線程B執行reader()方法

  • JMM禁止編譯器把final域的寫重排序到構造函數以外。
  • 編譯器會在final域的寫以後,構造函數return以前,插入一個StoreStore屏障。這個屏障禁止處理器把final域的寫重排序到構造函數以外。

讀到普通變量初始化以前的值

對象的普通域的操做被處理器重排序到讀對象引用以前。讀普通域時,該域尚未被寫線程A寫入,這是一個錯誤的讀取操做。而讀final域的重排序規則會把讀對象final域的操做「限定」在讀對象引用以後,此時該final域已經被A線程初始化過了,這是一個正確的讀取操做。

被final修飾的類型爲引用類型

在構造函數內對一個final引用的對象的成員域的寫入,與隨後在構造函數外把這個被構造對象的引用賦值給一個引用變量,這兩個操做之間不能重排序。

  • 1是對final域的寫入,2是對這個final域引用的對象的成員域的寫入,3是把被構造的對象的引用賦值給某個引用變量。這裏除了前面提到的1不能和3重排序外,2和3也不能重排序。

  • JMM能夠確保讀線程C至少能看到寫線程A在構造函數中對final引用對象的成員域的寫入。即C至少能看到數組下標0的值爲1。而寫線程B對數組元素的寫入,讀線程C可能看獲得,也可能看不到。JMM不保證線程B的寫入對讀線程C
    可見,由於寫線程B和讀線程C之間存在數據競爭,此時的執行結果不可預知。

  • 若是想要確保讀線程C看到寫線程B對數組元素的寫入,寫線程B和讀線程C之間須要使用同步原語(lock或volatile)來確保內存可見性。

爲何final引用不能從構造函數內溢出

在引用變量爲任意線程可見以前,該引用變量指向的對象的final域已經在構造函數中被正確初始化過了

在構造函數內部,不能讓這個被構造對象的引用爲其餘線程所見,也就是對象引用不能在構造函數中「逸出」。

執行read()方法的線程仍然可能沒法看到final域被初始化後的值,由於這裏的操做1和操做2之間可能被重排序。


final語義在處理器中的實現

寫final域的重排序規則會要求編譯器在final域的寫以後,構造函數return以前插入一個StoreStore障屏。讀final域的重排序規則要求編譯器在讀final域的操做前面插入一個LoadLoad屏障。因爲X86處理器不會對寫-寫操做作重排序,因此在X86處理器中,寫final域須要的StoreStore障屏會被省略掉。一樣,因爲X86處理器不會對存在間接依賴關係的操做作重排序,因此在X86處理器中,讀final域須要的LoadLoad屏障也會被省略掉。也就是說,在X86處理器中,final域的讀/寫不會插入任何內存屏障!(在x86處理器中僅有StoreLoad屏障)

JMM相關內容

在x86架構下僅有StoreLoad屏障

詳情請見

JMM內存模型設計原則

對於會改變程序執行結果的重排序,JMM要求編譯器和處理器必須禁止這種重排序。

對於不會改變程序執行結果的重排序,JMM對編譯器和處理器不作要求(JMM容許這種重排序)。

happens-before關係的定義

  1. 若是一個操做happens-before另外一個操做,那麼第一個操做的執行結果將對第二個操做可見,並且第一個操做的執行順序排在第二個操做以前。

  2. 兩個操做之間存在happens-before關係,並不意味着Java平臺的具體實現必需要按照happens-before關係指定的順序來執行。若是重排序以後的執行結果,與按happens-before
    關係來執行的結果一致,那麼這種重排序並不非法(也就是說,JMM容許這種重排序)。

  • 上面的1.是JMM對程序員的承諾。從程序員的角度來講,能夠這樣理解happens-before關係:若是A happens-before B,那麼Java內存模型將向程序員保證——A操做的結果將對B可見,且A的執行順序排在B
    以前。注意,這只是Java內存模型向程序員作出的保證!
  • 上面的2.是JMM對編譯器和處理器重排序的約束原則。正如前面所言,JMM實際上是在遵循一個基本原則:只要不改變程序的執行結果(指的是單線程程序和正確同步的多線程程序),編譯器和處理器怎麼優化都行。JMM
    這麼作的緣由是:程序員對於這兩個操做是否真的被重排序並不關心,程序員關心的是程序執行時的語義不能被改變(即執行結果不能被改變)。所以,happens-before關係本質上和as-if-serial語義是一回事。

happens-before 和 as-if-serial 異同點

相同點:

  • as-if-serial語義和happens-before這麼作的目的,都是爲了在不改變程序執行結果的前提下,儘量地提升程序執行的並行度。

不一樣點:

  • as-if-serial語義保證單線程內程序的執行結果不被改變,happens-before關係保證正確同步的多線程程序的執行結果不被改變。
  • as-if-serial語義給編寫單線程程序的程序員創造了一個幻境:單線程程序是按程序的順序來執行的。happens-before關係給編寫正確同步的多線程程序的程序員創造了一個幻境:正確同步的多線程程序是按happens
    -before指定的順序來執行的。

happens-before規則

  1. 程序順序規則:一個線程中的每一個操做,happens-before於該線程中的任意後續操做。
  2. 監視器鎖規則:對一個鎖的解鎖,happens-before於隨後對這個鎖的加鎖。
  3. volatile變量規則:對一個volatile域的寫,happens-before於任意後續對這個volatile域的讀。
  4. 傳遞性:若是A happens-before B,且B happens-before C,那麼A happens-beforeC。
  5. start()規則:若是線程A執行操做ThreadB.start()(啓動線程B),那麼A線程的ThreadB.start()操做happens-before於線程B中的任意操做。
  6. join()規則:若是線程A執行操做ThreadB. join()併成功返回,那麼線程B中的任意操做happens-before於線程A從ThreadB. join()操做成功返回。

  • 1 happens-before 2和3 happens-before 4由程序順序規則產生。因爲編譯器和處理器都要遵照as-if-serial語義,也就是說,as-if-serial
    語義保證了程序順序規則。所以,能夠把程序順序規則當作是對as-if-serial語義的「封裝」。
  • 2 happens-before 3是由volatile規則產生。前面提到過,對一個volatile變量的讀,老是能看到(任意線程)以前對這個volatile變量最後的寫入。所以,volatile
    的這個特性能夠保證明現volatile規則。
  • 1 happens-before 4是由傳遞性規則產生的。這裏的傳遞性是由volatile的內存屏障插入策略和volatile的編譯器重排序規則共同來保證的。

多線程併發初始化對象可能發生指令重排



這裏A2和A3雖然重排序了,但Java內存模型的intra-thread semantics將確保A2必定會排在A4前面執行。所以,線程A的intra-thread semantics沒有改變,但A2和A3的重排序,將致使線程B在B1處判斷出instance不爲空,線程B接下來將訪問instance引用的對象。此時,線程B將會訪問到一個還未初始化的對象。

在知曉了問題發生的根源以後,咱們能夠想出兩個辦法來實現線程安全的延遲初始化。

  • 不容許2和3重排序。
  • 容許2和3重排序,但不容許其餘線程「看到」這個重排序。

基於volatile的解決方案


這個方案本質上是經過禁止圖3-39中的2和3之間的重排序,來保證線程安全的延遲初始化

基於類初始化的解決方案

在執行類的初始化期間,JVM會去獲取一個鎖。這個鎖能夠同步多個線程對同一個類的初始化。


在首次發生下列任意一種狀況時,一個類或接口類型T將被當即初始化

  • T是一個類,並且一個T類型的實例被建立。
  • T是一個類,且T中聲明的一個靜態方法被調用。
  • T中聲明的一個靜態字段被賦值。
  • T中聲明的一個靜態字段被使用,並且這個字段不是一個常量字段。
  • T是一個頂級類(Top Level Class,見Java語言規範的§7.6),並且一個斷言語句嵌套在T內部被執行。

類初始化過程

第1階段:經過在Class對象上同步(即獲取Class對象的初始化鎖),來控制類或接口的初始化。這個獲取鎖的線程會一直等待,直到當前線程可以獲取到這個初始化鎖。


第2階段:線程A執行類的初始化,同時線程B在初始化鎖對應的condition上等待。


第3階段:線程A設置state=initialized,而後喚醒在condition中等待的全部線程。


第4階段:線程B結束類的初始化處理。

線程A在第2階段的A1執行類的初始化,並在第3階段的A4釋放初始化鎖;線程B在第4階段的B1獲取同一個初始化鎖,並在第4階段的B4以後纔開始訪問這個類。根據Java內存模型規範的鎖規則,這裏將存在以下的happens-before關係。這個happens-before關係將保證:線程A執行類的初始化時的寫入操做(執行類的靜態初始化和初始化類中聲明的靜態字段),線程B必定能看到。
第5階段:線程C執行類的初始化的處理。

在第3階段以後,類已經完成了初始化。所以線程C在第5階段的類初始化處理過程相對簡單一些(前面的線程A和B的類初始化處理過程都經歷了兩次鎖獲取-鎖釋放,而線程C的類初始化處理只須要經歷一次鎖獲取-鎖釋放)。線程A在第2階段的A1執行類的初始化,並在第3階段的A4釋放鎖;線程C在第5階段的C1獲取同一個鎖,並在在第5階段的C4以後纔開始訪問這個類。根據Java內存模型規範的鎖規則,將存在以下的happens-before關係。

經過對比基於volatile的雙重檢查鎖定的方案和基於類初始化的方案,咱們會發現基於類初始化的方案的實現代碼更簡潔。但基於volatile的雙重檢查鎖定的方案有一個額外的優點:除了能夠對靜態字段實現延遲初始化外,還能夠對實例字段實現延遲初始化。

字段延遲初始化下降了初始化類或建立實例的開銷,但增長了訪問被延遲初始化的字段的開銷。在大多數時候,正常的初始化要優於延遲初始化。若是確實須要對實例字段使用線程安全的延遲初始化,請使用上面介紹的基於volatile的延遲初始化的方案;若是確實須要對靜態字段使用線程安全的延遲初始化,請使用上面介紹的基於類初始化的方案。

處理器內存模型


內存模型劃分

放鬆程序中寫-讀操做的順序,由此產生了Total Store Ordering內存模型(簡稱爲TSO)。

在上面的基礎上,繼續放鬆程序中寫-寫操做的順序,由此產生了Partial Store Order內存模型(簡稱爲PSO)。

在前面兩條的基礎上,繼續放鬆程序中讀-寫和讀-讀操做的順序,由此產生了RelaxedMemory Order內存模型(簡稱爲RMO)和PowerPC內存模型。

這裏處理器對讀/寫操做的放鬆,是以兩個操做之間不存在數據依賴性爲前提的。

從表3-12中能夠看到,全部處理器內存模型都容許寫-讀重排序,緣由在第1章已經說明過:它們都使用了寫緩存區。寫緩存區可能致使寫-讀操做重排序。同時,咱們能夠看到這些處理器內存模型都容許更早讀到當前處理器的寫,緣由一樣是由於寫緩存區。因爲寫緩存區僅對當前處理器可見,這個特性致使當前處理器能夠比其餘處理器先看到臨時保存在本身寫緩存區中的寫。表3-12中的各類處理器內存模型,從上到下,模型由強變弱。越是追求性能的處理器,內存模型設計得會越弱。由於這些處理器但願內存模型對它們的束縛越少越好,這樣它們就能夠作儘量多的優化來提升性能。

因爲常見的處理器內存模型比JMM要弱,Java編譯器在生成字節碼時,會在執行指令序列的適當位置插入內存屏障來限制處理器的重排序。同時,因爲各類處理器內存模型的強弱不一樣,爲了在不一樣的處理器平臺向程序員展現一個一致的內存模型,JMM在不一樣的處理器中須要插入的內存屏障的數量和種類也不相同。

JMM屏蔽了不一樣處理器內存模型的差別,它在不一樣的處理器平臺之上爲Java程序員呈現了一個一致的內存模型。

各類內存模型之間的關係

JMM是一個語言級的內存模型,處理器內存模型是硬件級的內存模型,順序一致性內存模型是一個理論參考模型。下面是語言內存模型、處理器內存模型和順序一致性內存模型的強弱對比示意圖,如圖3-49所示。

從圖中能夠看出:常見的4種處理器內存模型比經常使用的3中語言內存模型要弱,處理器內存模型和語言內存模型都比順序一致性內存模型要弱。同處理器內存模型同樣,越是追求執行性能的語言,內存模型設計得會越弱。

JMM的內存可見性保證

  • 單線程程序。單線程程序不會出現內存可見性問題。編譯器、runtime和處理器會共同確保單線程程序的執行結果與該程序在順序一致性模型中的執行結果相同。

  • 正確同步的多線程程序。正確同步的多線程程序的執行將具備順序一致性(程序的執行結果與該程序在順序一致性內存模型中的執行結果相同)。這是JMM關注的重點,JMM經過限制編譯器和處理器的重排序來爲程序員提供內存可見性保證。

  • 未同步/未正確同步的多線程程序。JMM爲它們提供了最小安全性保障:線程執行時讀取到的值,要麼是以前某個線程寫入的值,要麼是默認值(0、null、false)。

最小安全性保障與64位數據的非原子性寫並不矛盾。它們是兩個不一樣的概念,它們「發生」的時間點也不一樣。

最小安全性「發生」在對象被任意線程使用以前。64位數據的非原子性寫「發生」在對象被多個線程使用的過程當中(寫共享變量)。

64位數據的非原子性寫「發生」在對象被多個線程使用的過程當中(寫共享變量)。當發生問題時(處理器B看到僅僅被處理器A「寫了一半」的無效值),這裏雖然處理器B讀取到一個被寫了一半的無效值,但這個值仍然是處理器A寫入的,只不過是處理器A尚未寫完而已。

最小安全性保證線程讀取到的值,要麼是以前某個線程寫入的值,要麼是默認值(0、null、false)。但最小安全性並不保證線程讀取到的值,必定是某個線程寫完後的值。最小安全性保證線程讀取到的值不會無中生有的冒出來,但並不保證線程讀取到的值必定是正確的。

JSR-133對舊內存模型的修補

加強volatile的內存語義。舊內存模型容許volatile變量與普通變量重排序。JSR-133嚴格限制volatile變量與普通變量的重排序,使volatile的寫-讀和鎖的釋放-獲取具備相同的內存語義。

加強final的內存語義。在舊內存模型中,屢次讀取同一個final變量的值可能會不相同。爲此,JSR-133爲final增長了兩個重排序規則。在保證final引用不會從構造函數內逸出的狀況下,final具備了初始化安全性。

java線程狀態

線程狀態

線程狀態之間的變化

Daemon線程

Daemon線程被用做完成支持性工做,可是在Java虛擬機退出時Daemon線程中的finally塊並不必定會執行。


main線程(非Daemon線程)在啓動了線程DaemonRunner以後隨着main方法執行完畢而終止,而此時Java虛擬機中已經沒有非Daemon線程,虛擬機須要退出。Java虛擬機中的全部Daemon線程都須要當即終止,所以DaemonRunner當即終止,可是DaemonRunner中的finally塊並無執行。

線程如何初始化

一個新構造的線程對象是由其parent線程來進行空間分配的,而child線程繼承了parent是否爲Daemon、優先級和加載資源的contextClassLoader以及可繼承的ThreadLocal,同時還會分配一個惟一的ID來標識這個child線程。至此,一個可以運行的線程對象就初始化好了,在堆內存中等待着運行。

線程start()方法的含義是:當前線程(即parent線程)同步告知Java虛擬機,只要線程規劃器空閒,應當即啓動調用start()方法的線程。

線程中斷 和 中斷異常

中斷比如其餘線程對該線程打了個招呼,其餘線程經過調用該線程的interrupt()方法對其進行中斷操做。

線程經過檢查自身是否被中斷來進行響應,線程經過方法isInterrupted()來進行判斷是否被中斷,也能夠調用靜態方法Thread.interrupted()對當前線程的中斷標識位進行復位。若是該線程已經處於終結狀態,即便該線程被中斷過,在調用該線程對象的isInterrupted()時依舊會返回false。

從Java的API中能夠看到,許多聲明拋出InterruptedException的方法(例如Thread.sleep(long millis)方法)這些方法在拋出InterruptedException以前,Java虛擬機會先將該線程的中斷標識位清除,而後拋出InterruptedException,此時調用isInterrupted()方法將會返回false。

public class Interrupted {
    public static void main(String[] args) throws Exception {
        // sleepThread不停的嘗試睡眠
        Thread sleepThread = new Thread(new SleepRunner(), "SleepThread");
        sleepThread.setDaemon(true);
        // busyThread不停的運行
        Thread busyThread = new Thread(new BusyRunner(), "BusyThread");
        busyThread.setDaemon(true);
        sleepThread.start();
        busyThread.start();
        // 休眠5秒,讓sleepThread和busyThread充分運行
        TimeUnit.SECONDS.sleep(5);
        sleepThread.interrupt();
        busyThread.interrupt();
        System.out.println("SleepThread interrupted is " + sleepThread.isInterrupted());
        System.out.println("BusyThread interrupted is " + busyThread.isInterrupted());
        // 防止sleepThread和busyThread馬上退出
        SleepUtils.second(2);
    }
    static class SleepRunner implements Runnable {
        @Override
        public void run() {
            while (true) {
                SleepUtils.second(10);
            }
        }
    }
    static class BusyRunner implements Runnable {
        @Override
        public void run() {
            while (true) {
            }
        }
    }
}

拋出InterruptedException的線程SleepThread,其中斷標識位被清除了,而一直忙碌運做的線程BusyThread,中斷標識位沒有被清除。

synchronized實現細節

本質是對一個對象的監視器(monitor)進行獲取,而這個獲取過程是排他的,也就是同一時刻只能有一個線程獲取到由synchronized所保護對象的監視器。

一個線程對Object(Object由synchronized保護)的訪問,首先要得到Object的監視器。若是獲取失敗,線程進入同步隊列,線程狀態變爲BLOCKED。當訪問Object
的前驅(得到了鎖的線程)釋放了鎖,則該釋放操做喚醒阻塞在同步隊列中的線程,使其從新嘗試對監視器的獲取。

等待通知

等待/通知機制,是指一個線程A調用了對象O的wait()方法進入等待狀態,而另外一個線程B調用了對象O的notify()或者notifyAll()方法,線程A收到通知後從對象O的wait()方法返回,進而執行後續操做。上述兩個線程經過對象O來完成交互,而對象上的wait()和notify/notifyAll()的關係就如同開關信號同樣,用來完成等待方和通知方之間的交互工做。

public class WaitNotify {
    static boolean flag = true;
    static Object lock = new Object();

    public static void main(String[] args) throws Exception {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }

    static class Wait implements Runnable {
        public void run() {
            // 加鎖,擁有lock的Monitor
            synchronized (lock) {
                // 當條件不知足時,繼續wait,同時釋放了lock的鎖
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread()+ " flagistrue.wait
                        @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
            } catch (InterruptedException e) {
            }
     }
     // 條件知足時,完成工做
     System.out.println(Thread.currentThread() + " flag is false. running
     @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
              }
          }
      }
      static class Notify implements Runnable {
          public void run() {
              // 加鎖,擁有lock的Monitor
              synchronized (lock) {
                  // 獲取lock的鎖,而後進行通知,通知時不會釋放lock的鎖,
                  // 直到當前線程釋放了lock後,WaitThread才能從wait方法中返回
                  System.out.println(Thread.currentThread() + " hold lock. notify @ " +
                  new SimpleDateFormat("HH:mm:ss").format(new Date()));
                  lock.notifyAll();
                  flag = false;
                  SleepUtils.second(5);
              }
              // 再次加鎖
              synchronized (lock) {
                  System.out.println(Thread.currentThread() + " hold lock again. sleep
                  @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                  SleepUtils.second(5);
              }
          }
      }
}

public class SleepUtils {
	public static final void second(long seconds) {
		try {
			TimeUnit.SECONDS.sleep(seconds);
		} catch (InterruptedException e){

		}
	}
}

調用wait()、notify()以及notifyAll()時須要注意的細節

  1. 使用wait()、notify()和notifyAll()時須要先對調用對象加鎖。
  2. 調用wait()方法後,線程狀態由RUNNING變爲WAITING,並將當前線程放置到對象的等待隊列。
  3. notify()或notifyAll()方法調用後,等待線程依舊不會從wait()返回,須要調用notify()或notifAll()的線程釋放鎖以後,等待線程纔有機會從wait()返回。
  4. notify()方法將等待隊列中的一個等待線程從等待隊列中移到同步隊列中,而notifyAll()方法則是將等待隊列中全部的線程所有移到同步隊列,被移動的線程狀態由WAITING變爲BLOCKED。
  5. 從wait()方法返回的前提是得到了調用對象的鎖。

WaitThread首先獲取了對象的鎖,而後調用對象的wait()方法,從而放棄了鎖並進入了對象的等待隊列WaitQueue中,進入等待狀態。因爲WaitThread釋放了對象的鎖,NotifyThread隨後獲取了對象的鎖,並調用對象的notify()方法,將WaitThread從WaitQueue移到SynchronizedQueue中,此時WaitThread的狀態變爲阻塞狀態。NotifyThread釋放了鎖以後,WaitThread再次獲取到鎖並從wait()方法返回繼續執行。

ThreadLocal 變量使用

鏈接池案例 鏈接數增長則總連接數增長,同時爲獲取到的比例也在增長

/**
 * 從鏈接池中獲取、使用和釋放鏈接的過程,
 * 而客戶端獲取鏈接的過程被設定爲等待超時的模式,
 * 也就是在1000毫秒內若是沒法獲取到可用鏈接,
 * 將會返回給客戶端一個null。設定鏈接池的大小爲10個,
 * 而後經過調節客戶端的線程數來模擬沒法獲取鏈接的場景。
 */
public class ConnectionPool {
    private LinkedList<Connection> pool = new LinkedList<Connection>();

    public ConnectionPool(int initialSize) {
        if (initialSize > 0) {
            for (int i = 0; i < initialSize; i++) {
                pool.addLast(ConnectionDriver.createConnection());
            }
        }
    }

    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool) {
                // 鏈接釋放後須要進行通知,這樣其餘消費者可以感知到鏈接池中已經歸還了一個鏈接
                pool.addLast(connection);
                pool.notifyAll();
            }
        }
    }

    // 在mills內沒法獲取到鏈接,將會返回null
    public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized (pool) {
            // 徹底超時
            if (mills <= 0) {
                while (pool.isEmpty()) {
                    pool.wait();
                }
                return pool.removeFirst();
            } else {
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while (pool.isEmpty() && remaining > 0) {
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }
                Connection result = null;
                if (!pool.isEmpty()) {
                    result = pool.removeFirst();
                }
                return result;
            }
        }
    }
}
/**
 * 咱們經過動態代理構造了一個Connection,該Connection的代理實現僅僅
 * 是在commit()方法調用時休眠100毫秒
 */
public class ConnectionDriver {
    static class ConnectionHandler implements InvocationHandler {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getName().equals("commit")) {
                TimeUnit.MILLISECONDS.sleep(100);
            }
            return null;
        }
    }

    // 建立一個Connection的代理,在commit時休眠100毫秒
    public static final Connection createConnection() {
        return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),
                new Class<?>[] { Connection.class }, new ConnectionHandler());
    }
}
/**
 * 使用了CountDownLatch來確保ConnectionRunnerThread可以同時開始執行,
 * 而且在所有結束以後,才使main線程從等待狀態中返回。
 * 當前設定的場景是10個線程同時運行獲取鏈接池(10個鏈接)中的鏈接,
 * 經過調節線程數量來觀察未獲取到鏈接的狀況
 */
public class ConnectionPoolTest {
    static ConnectionPool pool    = new ConnectionPool(10);
    // 保證全部ConnectionRunner可以同時開始
    static CountDownLatch start    = new CountDownLatch(1);
    // main線程將會等待全部ConnectionRunner結束後才能繼續執行
    static CountDownLatch end;

    public static void main(String[] args) throws Exception {
        // 線程數量,能夠修改線程數量進行觀察
        int threadCount = 10;
        end = new CountDownLatch(threadCount);
        int count = 20;
        AtomicInteger got = new AtomicInteger();
        AtomicInteger notGot = new AtomicInteger();
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new ConnetionRunner(count, got, notGot),
                    "ConnectionRunnerThread");
            thread.start();
        }
        start.countDown();
        end.await();
        System.out.println("total invoke: " + (threadCount * count));
        System.out.println("got connection: " + got);
        System.out.println("not got connection " + notGot);
    }

    static class ConnetionRunner implements Runnable {
        int        count;
        AtomicInteger    got;
        AtomicInteger    notGot;

        public ConnetionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }

        public void run() {
            try {
                start.await();
            } catch (Exception ex) {
            }
            while (count > 0) {
                try {
                    // 從線程池中獲取鏈接,若是1000ms內沒法獲取到,將會返回null
                    // 分別統計鏈接獲取的數量got和未獲取到的數量notGot
                    Connection connection = pool.fetchConnection(1000);
                    if (connection != null) {
                        try {
                            connection.createStatement();
                            connection.commit();
                        } finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    } else {
                        notGot.incrementAndGet();
                    }
                } catch (Exception ex) {
                } finally {
                    count--;
                }
            }
            end.countDown();
        }
    }
}

線程池

public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    // 線程池最大限制數
    private static final intMAX_WORKER_NUMBERS = 10;
    // 線程池默認的數量
    private static final int    DEFAULT_WORKER_NUMBERS = 5;
    // 線程池最小的數量
    private static final int    MIN_WORKER_NUMBERS= 1;
    // 這是一個工做列表,將會向裏面插入工做
    private final LinkedList<Job>    jobs = new LinkedList<Job>();
    // 工做者列表
    private final List<Worker>    workers    = Collections.synchronizedList(new
    ArrayList<Worker>());
    // 工做者線程的數量
    private int  workerNum = DEFAULT_WORKER_NUMBERS;
    // 線程編號生成
    private AtomicLong    threadNum    = new AtomicLong();

    public DefaultThreadPool() {
    initializeWokers(DEFAULT_WORKER_NUMBERS);
    }

    public DefaultThreadPool(int num) {
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_
        NUMBERS ? MIN_WORKER_NUMBERS : num;
        initializeWokers(workerNum);
    }

    public void execute(Job job) {
        if (job != null) {
            // 添加一個工做,而後進行通知
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }

    public void shutdown() {
        for (Worker worker : workers) {
            worker.shutdown();
        }
    }

    public void addWorkers(int num) {
        synchronized (jobs) {
            // 限制新增的Worker數量不能超過最大值
            if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWokers(num);
            this.workerNum += num;
        }
    }

    public void removeWorker(int num) {
        synchronized (jobs) {
            if (num >= this.workerNum) {
                throw new IllegalArgumentException("beyond workNum");
            }
            // 按照給定的數量中止Worker
            int count = 0;
            while (count < num) {
                Worker worker = workers.get(count)
                if (workers.remove(worker)) {
                worker.shutdown();
                      count++;
                }
            }
            this.workerNum -= count;
        }
    }

    public int getJobSize() {
        return jobs.size();
    }
    // 初始化線程工做者
    private void initializeWokers(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.
            incrementAndGet());
            thread.start();
        }
    }

    // 工做者,負責消費任務
    class Worker implements Runnable {
        // 是否工做
        private volatile boolean running= true;
        public void run() {
            while (running) {
                Job job = null;
                synchronized (jobs) {
                    // 若是工做者列表是空的,那麼就wait
                    while (jobs.isEmpty()) {
                        try {
                            jobs.wait();
                        } catch (InterruptedException ex) {
                            // 感知到外部對WorkerThread的中斷操做,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                     }
                     // 取出一個Job
                     job = jobs.removeFirst();
                }
                if (job != null) {
                    try {
                        job.run();
                    } catch (Exception ex) {
                        // 忽略Job執行中的Exception
                    }
                }
             }
          }

          public void shutdown() {
              running = false;
          }
    }
}

lock鎖

鎖和同步器AQS概念區別

鎖是面向使用者的,它定義了使用者與鎖交互的接口(好比能夠容許兩個線程並行訪問),隱藏了實現細節;

同步器面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待與喚醒等底層操做。鎖和同步器很好地隔離了使用者和實現者所需關注的領域

所以同步器提供了一個基於CAS的設置尾節點的方法:compareAndSetTail(Node expect, Node update),它須要傳遞當前線程「認爲」的尾節點和當前節點,只有設置成功後,當前節點才正式與以前的尾節點創建關聯。

同步隊列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的線程在釋放同步狀態時,將會喚醒後繼節點,然後繼節點將會在獲取同步狀態成功時將本身設置爲首節點,以下圖所示

設置首節點是經過獲取同步狀態成功的線程來完成的,因爲只有一個線程可以成功獲取到同步狀態,所以設置頭節點的方法並不須要使用CAS來保證,它只須要將首節點設置成爲原首節點的後繼節點並斷開原首節點的next引用便可。

參考資料

  1. 書籍名稱:《java併發編程的藝術》 做者:方騰飛 魏鵬 程曉明

歡迎關注微信公衆號哦~ ~

相關文章
相關標籤/搜索