Java中多線程併發體系知識點彙總

1、多線程html

一、操做系統有兩個容易混淆的概念,進程和線程。java

進程:一個計算機程序的運行實例,包含了須要執行的指令;有本身的獨立地址空間,包含程序內容和數據;不一樣進程的地址空間是互相隔離的;進程擁有各類資源和狀態信息,包括打開的文件、子進程和信號處理。算法

線程:表示程序的執行流程,是CPU調度執行的基本單位;線程有本身的程序計數器、寄存器、堆棧和幀。同一進程中的線程共用相同的地址空間,同時共享進進程鎖擁有的內存和其餘資源。編程

二、Java標準庫提供了進程和線程相關的API,進程主要包括表示進程的java.lang.Process類和建立進程的java.lang.ProcessBuilder類;數組

表示線程的是java.lang.Thread類,在虛擬機啓動以後,一般只有Java類的main方法這個普通線程運行,運行時能夠建立和啓動新的線程;還有一類守護線程(damon thread),守護線程在後臺運行,提供程序運行時所需的服務。當虛擬機中運行的全部線程都是守護線程時,虛擬機終止運行。緩存

三、線程間的可見性:一個線程對進程中共享的數據的修改,是否對另外一個線程可見安全

可見性問題:數據結構

a、CPU採用時間片輪轉等不一樣算法來對線程進行調度多線程

[java] view plain copy 架構

  1. public class IdGenerator{

  2. private int value = 0;

  3. public int getNext(){

  4. return value++;

  5. }

  6. }

public class IdGenerator{



   private int value = 0;



   public int getNext(){



     return value++;



   }



}

對於IdGenerator的getNext()方法,在多線程下不能保證返回值是不重複的:各個線程之間相互競爭CPU時間來獲取運行機會,CPU切換可能發生在執行間隙。

以上代碼getNext()的指令序列:CPU切換可能發生在7條指令之間,多個getNext的指令交織在一塊兒。

[java] view plain copy

  1. aload_0

  2. dup

  3. getfield #12

  4. dup_x1

  5. iconst_1

  6. iadd

  7. putfield #12


aload_0



dup



getfield #12



dup_x1



iconst_1



iadd



putfield #12

b、CPU緩存:

目前CPU通常採用層次結構的多級緩存的架構,有的CPU提供了L一、L2和L3三級緩存。當CPU須要讀取主存中某個位置的數據時,會一次檢查各級緩存中是否存在對應的數據。若是有,直接從緩存中讀取,這比從主存中讀取速度快不少。當CPU須要寫入時,數據先被寫入緩存中,以後再某個時間點寫回主存。因此某些時間點上,緩存中的數據與主存中的數據多是不一致。

c、指令順序重排

出行性能考慮,編譯器在編譯時可能會對字節代碼的指令順序進行從新排列,以優化指令的執行順序,在單線程中不會有問題,但在多線程可能產生與可見性相關的問題。

2、Java內存模型(Java Memory Model)

屏蔽了CPU緩存等細節,只關注主存中的共享變量;關注對象的實例域、靜態域和數組元素;關注線程間的動做。

一、volatile關鍵詞:用來對共享變量的訪問進行同步,上一次寫入操做的結果對下一次讀取操做是確定可見的。(在寫入volatile變量值以後,CPU緩存中的內容會被寫回內存;在讀取volatile變量時,CPU緩存中的對應內容會被置爲失效,從新從主存中進行讀取),volatile不使用鎖,性能優於synchronized關鍵詞。

用來確保對一個變量的修改被正確地傳播到其餘線程中。

例子:A線程是Worker,一直跑循環,B線程調用setDone(true),A線程即中止任務

[java] view plain copy

  1. public class Worker{

  2. private volatile boolean done;

  3. public void setDone(boolean done){

  4. this.done = done;

  5. }

  6. public void work(){

  7. while(!done){

  8. //執行任務;

  9. }

  10. }

  11. }


public class Worker{



   private volatile boolean done;



   public void setDone(boolean done){



     this.done = done;



   }



   public void work(){



     while(!done){



         //執行任務;



     }



   }



}

例子:錯誤使用。由於沒有鎖的支持,volatile的修改不能依賴於當前值,當前值可能在其餘線程中被修改。(Worker是直接賦新值與當前值無關)

[java] view plain copy

  1. public class Counter {

  2. public volatile static int count = 0;

  3. public static void inc() {

  4. //這裏延遲1毫秒,使得結果明顯

  5. try {

  6. Thread.sleep(1);

  7. } catch (InterruptedException e) {

  8. }

  9. count++;

  10. }

  11. public static void main(String[] args) {

  12. //同時啓動1000個線程,去進行i++計算,看看實際結果

  13. for (int i = 0; i < 1000; i++) {

  14. new Thread(new Runnable() {

  15. @Override

  16. public void run() {

  17. Counter.inc();

  18. }

  19. }).start();

  20. }

  21. //這裏每次運行的值都有可能不一樣,可能不爲1000

  22. System.out.println("運行結果:Counter.count=" + Counter.count);

  23. }

  24. }


public class Counter {



   public volatile static int count = 0;



   public static void inc() {



       //這裏延遲1毫秒,使得結果明顯



       try {



           Thread.sleep(1);



       } catch (InterruptedException e) {



       }



       count++;



   }



   public static void main(String[] args) {



       //同時啓動1000個線程,去進行i++計算,看看實際結果



       for (int i = 0; i < 1000; i++) {



           new Thread(new Runnable() {



               @Override



               public void run() {



                   Counter.inc();



               }



           }).start();



       }



       //這裏每次運行的值都有可能不一樣,可能不爲1000



       System.out.println("運行結果:Counter.count=" + Counter.count);



   }



}

二、final關鍵詞

final關鍵詞聲明的域的值只能被初始化一次,通常在構造方法中初始化。。(在多線程開發中,final域一般用來實現不可變對象)

當對象中的共享變量的值不可能發生變化時,在多線程中也就不須要同步機制來進行處理,故在多線程開發中應儘量使用不可變對象。

另外,在代碼執行時,final域的值能夠被保存在寄存器中,而不用從主存中頻繁從新讀取。

三、java基本類型的原子操做

1)基本類型,引用類型的複製引用是原子操做;(即一條指令完成)

2)long與double的賦值,引用是能夠分割的,非原子操做;

3)要在線程間共享long或double的字段時,必須在synchronized中操做,或是聲明成volatile

3、Java提供的線程同步方式

一、synchronized關鍵字

方法或代碼塊的互斥性來完成實際上的一個原子操做。(方法或代碼塊在被一個線程調用時,其餘線程處於等待狀態)

全部的Java對象都有一個與synchronzied關聯的監視器對象(monitor),容許線程在該監視器對象上進行加鎖和解鎖操做。

a、靜態方法:Java類對應的Class類的對象所關聯的監視器對象。

b、實例方法:當前對象實例所關聯的監視器對象。

c、代碼塊:代碼塊聲明中的對象所關聯的監視器對象。

注:當鎖被釋放,對共享變量的修改會寫入主存;當活得鎖,CPU緩存中的內容被置爲無效。編譯器在處理synchronized方法或代碼塊,不會把其中包含的代碼移動到synchronized方法或代碼塊以外,從而避免了因爲代碼重排而形成的問題。

例:如下方法getNext()和getNextV2() 都得到了當前實例所關聯的監視器對象

[java] view plain copy

  1. public class SynchronizedIdGenerator{

  2. private int value = 0;

  3. public synchronized int getNext(){

  4. return value++;

  5. }

  6. public int getNextV2(){

  7. synchronized(this){

  8. return value++;

  9. }

  10. }

  11. }


public class SynchronizedIdGenerator{



   private int value = 0;



   public synchronized int getNext(){



     return value++;



   }



   public int getNextV2(){



     synchronized(this){



         return value++;



     }



   }



}

二、Object類的wait、notify和notifyAll方法

生產者和消費者模式,判斷緩衝區是否滿來消費,緩衝區是否空來生產的邏輯。若是用while 和 volatile也能夠作,不過本質上會讓線程處於忙等待,佔用CPU時間,對性能形成影響。

wait: 將當前線程放入,該對象的等待池中,線程A調用了B對象的wait()方法,線程A進入B對象的等待池,而且釋放B的鎖。(這裏,線程A必須持有B的鎖,因此調用的代碼必須在synchronized修飾下,不然直接拋出java.lang.IllegalMonitorStateException異常)。

notify:將該對象中等待池中的線程,隨機選取一個放入對象的鎖池,噹噹前線程結束後釋放掉鎖, 鎖池中的線程便可競爭對象的鎖來得到執行機會。

notifyAll:將對象中等待池中的線程,所有放入鎖池。

(notify鎖喚醒的線程選擇由虛擬機實現來決定,不能保證一個對象鎖關聯的等待集合中的線程按照所指望的順序被喚醒,極可能一個線程被喚醒以後,發現他所要求的條件並無知足,而從新進入等待池。由於當等待池中包含多個線程時,通常使用notifyAll方法,不過該方法會致使線程在沒有必要的狀況下被喚醒,以後又立刻進入等待池,對性能有影響,不過能保證程序的正確性)

工做流程:

a、Consumer線程A 來 看產品,發現產品爲空,調用產品對象的wait(),線程A進入產品對象的等待池並釋放產品的鎖。

b、Producer線程B得到產品的鎖,執行產品的notifyAll(),Consumer線程A從產品的等待池進入鎖池,Producer線程B生產產品,而後退出釋放鎖。

c、Consumer線程A得到產品鎖,進入執行,發現有產品,消費產品,而後退出。

例子:

[java] view plain copy

  1. public synchronized String pop(){

  2. this.notifyAll();// 喚醒對象等待池中的全部線程,可能喚醒的就是 生產者(當生產者發現產品滿,就會進入對象的等待池,這裏代碼省略,基本略同)

  3. while(index == -1){//若是發現沒產品,就釋放鎖,進入對象等待池

  4. this.wait();

  5. }//當生產者生產完後,消費者從this.wait()方法再開始執行,第一次還會執行循環,萬一產品仍是爲空,則再等待,因此這裏必須用while循環,不能用if

  6. String good = buffer[index];

  7. buffer[index] = null;

  8. index--;

  9. return good;// 消費完產品,退出。

  10. }


public synchronized String pop(){



this.notifyAll();// 喚醒對象等待池中的全部線程,可能喚醒的就是 生產者(當生產者發現產品滿,就會進入對象的等待池,這裏代碼省略,基本略同)



   while(index == -1){//若是發現沒產品,就釋放鎖,進入對象等待池



     this.wait();



   }//當生產者生產完後,消費者從this.wait()方法再開始執行,第一次還會執行循環,萬一產品仍是爲空,則再等待,因此這裏必須用while循環,不能用if



   String good = buffer[index];



   buffer[index] = null;



   index--;



   return good;// 消費完產品,退出。



}

注:wait()方法有超時和不超時之分,超時的在通過一段時間,線程還在對象的等待池中,那麼線程也會推出等待狀態。

三、線程狀態轉換:

已經廢棄的方法:stop、suspend、resume、destroy,這些方法在實現上時不安全的。

線程的狀態:NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING(有超時的等待)、TERMINATED。

a、方法sleep()進入的阻塞狀態,不會釋放對象的鎖(即你們一塊兒睡,誰也別想執行代碼),因此不要讓sleep方法處在synchronized方法或代碼塊中,不然形成其餘等待獲取鎖的線程長時間處於等待。

b、方法join()則是主線程等待子線程完成,再往下執行。例如main方法新建兩個線程A和B

[java] view plain copy

  1. public static void main(String[] args) throws InterruptedException {

  2. Thread t1 = new Thread(new ThreadTesterA());

  3. Thread t2 = new Thread(new ThreadTesterB());

  4. t1.start();

  5. t1.join(); // 等t1執行完再往下執行

  6. t2.start();

  7. t2.join(); // 在虛擬機執行中,這句可能被忽略

  8. }


public static void main(String[] args) throws InterruptedException { 



Thread t1 = new Thread(new ThreadTesterA()); 



Thread t2 = new Thread(new ThreadTesterB()); 



t1.start(); 



t1.join(); // 等t1執行完再往下執行



t2.start(); 



t2.join(); // 在虛擬機執行中,這句可能被忽略



}

c、方法interrupt(),向被調用的對象線程發起中斷請求。如線程A經過調用線程B的d的interrupt方法來發出中斷請求,線程B來處理這個請求,固然也能夠忽略,這不是必須的。Object類的wait()、Thread類的join()和sleep方法都會拋出受檢異常java.lang.InterruptedException,經過interrupt方法中斷該線程會致使線程離開等待狀態。對於wait()調用來講,線程須要從新獲取監視器對象上的鎖以後才能拋出InterruptedException異常,並致以異常的處理邏輯。

能夠經過Thread類的isInterrupted方法來判斷是否有中斷請求發生,一般能夠利用這個方法來判斷是否退出線程(相似上面的volatitle修飾符的例子);

Thread類還有個方法Interrupted(),該方法不但能夠判斷當前線程是否被中斷,還會清楚線程內部的中斷標記,若是返回true,即曾被請求中斷,同時調用完後,清除中斷標記。

若是一個線程在某個對象的等待池,那麼notify和interrupt 均可以使該線程從等待池中被移除。若是同時發生,那麼看實際發生順序。若是是notify先,那照常喚醒,沒影響。若是是interrupt先,而且虛擬機選擇讓該線程中斷,那麼即便nofity,也會忽略該線程,而喚醒等待池中的另外一個線程。

e、yield(),嘗試讓出所佔有的CPU資源,讓其餘線程獲取運行機會,對操做系統上的調度器來講是一個信號,不必定當即切換線程。(在實際開發中,測試階段頻繁調用yeid方法使線程切換更頻繁,從而讓一些多線程相關的錯誤更容易暴露出來)。

4、非阻塞方式

線程之間同步機制的核心是監視對象上的鎖,競爭鎖來得到執行代碼的機會。當一個對象獲取對象的鎖,而後其餘嘗試獲取鎖的對象會處於等待狀態,這種鎖機制的實現方式很大程度限制了多線程程序的吞吐量和性能(線程阻塞),且會帶來死鎖(線程A有a對象鎖,等着獲取b對象鎖,線程B有b對象鎖,等待獲取a對象鎖)和優先級倒置(優先級低的線程得到鎖,優先級高的只能等待對方釋放鎖)等問題。

若是能不阻塞線程,又能保證多線程程序的正確性,就能有更好的性能。

在程序中,對共享變量的使用通常遵循必定的模式,即讀取、修改和寫入三步組成。以前碰到的問題是,這三步執行中可能線程執行切換,形成非原子操做。鎖機制是把這三步變成一個原子操做。

目前CPU自己實現 將這三步 合起來 造成一個原子操做,無需線程鎖機制干預,常見的指令是「比較和替換」(compare and swap,CAS),這個指令會先比較某個內存地址的當前值是否是指定的舊指,若是是,就用新值替換,不然什麼也不作,指令返回的結果是內存地址的當前值。經過CAS指令能夠實現不依賴鎖機制的非阻塞算法。通常作法是把CAS指令的調用放在一個無限循環中,不斷嘗試,知道CAS指令成功完成修改。

java.util.concurrent.atomic包中提供了CAS指令。(不是全部CPU都支持CAS,在某些平臺,java.util.concurrent.atomic的實現仍然是鎖機制)

atomic包中提供的Java類分紅三類:

一、支持以原子操做來進行更新的數據類型的Java類(AtomicBoolean、AtomicInteger、AtomicReference),在內存模型相關的語義上,這四個類的對象相似於volatile變量。

類中的經常使用方法:

a、compareAndSet:接受兩個參數,一個是指望的舊值,一個是替換的新值。

b、weakCompareAndSet:效果同compareAndSet(JSR中表示weak原子方式讀取和有條件地寫入變量但不建立任何 happen-before 排序,但在源代碼中和compareAndSet徹底同樣,因此並無按JSR實現)

c、get和set:分別用來直接獲取和設置變量的值。

d、lazySet:與set相似,但容許編譯器把lazySet方法的調用與後面的指令進行重排,所以對值得設置操做有可能被推遲。

例:

[java] view plain copy

  1. public class AtomicIdGenerator{

  2. private final AtomicInter counter = new AtomicInteger(0);

  3. public int getNext(){

  4. return counter.getAndIncrement();

  5. }

  6. }

  7. // getAndIncrement方法的內部實現方式,這也是CAS方法的通常模式,CAS方法不必定成功,因此包裝在一個無限循環中,直到成功

  8. public final int getAndIncrement(){

  9. for(;;){

  10. int current = get();

  11. int next = current +1;

  12. if(compareAndSet(current,next))

  13. return current;

  14. }

  15. }


public class AtomicIdGenerator{



   private final AtomicInter counter = new AtomicInteger(0);



   public int getNext(){



     return counter.getAndIncrement();



   }



}



// getAndIncrement方法的內部實現方式,這也是CAS方法的通常模式,CAS方法不必定成功,因此包裝在一個無限循環中,直到成功



public final int getAndIncrement(){



   for(;;){



     int current = get();



     int next = current +1;



     if(compareAndSet(current,next))



         return current;



   }



}

二、提供對數組類型的變量進行處理的Java類,AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray類。(同上,只是放在類數組裏,調用時也只是多了一個操做元素索引的參數)

三、經過反射的方式對任何對象中包含的volatitle變量使用CAS方法,AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。他們提供了一種方式把CAS的功能擴展到了任何Java類中聲明爲volatitle的域上。(靈活,但語義較弱,由於對象的volatitle可能被非atomic的其餘方式被修改)

[java] view plain copy

  1. public class TreeNode{

  2. private volatile TreeNode parent;

  3. // 靜態工廠方法

  4. private static final AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent");

  5. public boolean compareAndSetParent(TreeNode expect, TreeNode update){

  6. return parentUpdater.compareAndSet(this, expect, update);

  7. }

  8. }


public class TreeNode{



   private volatile TreeNode parent;



// 靜態工廠方法



   private static final AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent");



public boolean compareAndSetParent(TreeNode expect, TreeNode update){



     return parentUpdater.compareAndSet(this, expect, update);



}



}

注:java.util.concurrent.atomic包中的Java類屬於比較底層的實現,通常做爲java.util.concurrent包中不少非阻塞的數據結構的實現基礎。

比較多的用AtomicBoolean、AtomicInteger、AtomicLong和AtomicReference。在實現線程安全的計數器時,AtomicInteger和AtomicLong類時最佳的選擇。

5、高級同步機制(比synchronized更靈活的加鎖機制)

synchronized和volatile,以及wait、notify等方法抽象層次低,在程序開發中使用比較繁瑣,易出錯。

而多線程之間的交互來講,存在某些固定的模式,如生產者-消費者和讀者-寫者模式,把這些模式抽象成高層API,使用起來會很是方便。

java.util.concurrent包爲多線程提供了高層的API,知足平常開發中的常見需求。

經常使用接口

一、Lock接口,表示一個鎖方法:

a、lock(),獲取所,若是沒法獲取所鎖,會處於等待狀態

b、unlock(),釋放鎖。(通常放在finally代碼塊中)

c、lockInterruptibly(),與lock()相似,但容許當前線程在等待獲取鎖的過程當中被中斷。(因此要處理InterruptedException)

d、tryLock(),以非阻塞方式獲取鎖,若是沒法獲取鎖,則返回false。(tryLock()的另外一個重載能夠指定超時,若是指定超時,當沒法獲取鎖,會等待而阻塞,同時線程能夠被中斷)

二、ReadWriteLock接口,表示兩個鎖,讀取的共享鎖和寫入的排他鎖。(適合常見的讀者--寫者場景)

ReadWriteLock接口的readLock和writeLock方法來獲取對應的鎖的Lock接口的實現。

在多數線程讀取,少數線程寫入的狀況下,能夠提升多線程的性能,提升使用該數據結構的吞吐量。

若是是相反的狀況,較多的線程寫入,則接口會下降性能。

三、ReentrantLock類和ReentrantReadWriteLock,分別爲上面兩個接口的實現類。

他們具備重入性:即容許一個線程屢次獲取同一個鎖(他們會記住上次獲取鎖而且未釋放的線程對象,和加鎖的次數,getHoldCount())

同一個線程每次獲取鎖,加鎖數+1,每次釋放鎖,加鎖數-1,到0,則該鎖被釋放,能夠被其餘線程獲取。

[java] view plain copy

  1. public class LockIdGenrator{

  2. //new ReentrantLock(true)是重載,使用更加公平的加鎖機制,在鎖被釋放後,會優先給等待時間最長的線程,避免一些線程長期沒法得到鎖

  3. private int ReentrantLock lock = ReentrantLock();

  4. privafte int value = 0;

  5. public int getNext(){

  6. lock.lock(); //進來就加鎖,沒有鎖會等待

  7. try{

  8. return value++;//實際操做

  9. }finally{

  10. lock.unlock();//釋放鎖

  11. }

  12. }

  13. }


public class LockIdGenrator{



//new ReentrantLock(true)是重載,使用更加公平的加鎖機制,在鎖被釋放後,會優先給等待時間最長的線程,避免一些線程長期沒法得到鎖



   private int ReentrantLock lock = ReentrantLock();



   privafte int value = 0;



   public int getNext(){



     lock.lock();     //進來就加鎖,沒有鎖會等待



     try{



         return value++;//實際操做



     }finally{



         lock.unlock();//釋放鎖



     }



   }



}

注:重入性減小了鎖在各個線程之間的等待,例如便利一個HashMap,每次next()以前加鎖,以後釋放,能夠保證一個線程一口氣完成便利,而不會每次next()以後釋放鎖,而後和其餘線程競爭,下降了加鎖的代價, 提供了程序總體的吞吐量。(即,讓一個線程一口氣完成任務,再把鎖傳遞給其餘線程)。

四、Condition接口,Lock接口代替了synchronized,Condition接口替代了object的wait、nofity。

a、await(),使當前線程進入等待狀態,知道被喚醒或中斷。重載形式能夠指定超時時間。

b、awaitNanos(),以納秒爲單位等待。

c、awaitUntil(),指定超時發生的時間點,而不是通過的時間,參數爲java.util.Date。

d、awaitUninterruptibly(),前面幾種會響應其餘線程發出的中斷請求,他會無視,直到被喚醒。

注:與Object類的wait()相同,await()會釋放其所持有的鎖。

e、signal()和signalAll, 至關於 notify和notifyAll

[java] view plain copy

  1. Lock lock = new ReentrantLock();

  2. Condition condition = lock.newCondition();

  3. lock.lock();

  4. try{

  5. while(/邏輯條件不知足/){

  6. condition.await();

  7. }

  8. }finally{

  9. lock.unlock();

  10. }


Lock lock = new ReentrantLock();



Condition condition = lock.newCondition();



lock.lock();



try{



   while(/*邏輯條件不知足*/){



     condition.await();  



   }



}finally{



   lock.unlock();



}

6、底層同步器

多線程程序中,線程之間存在多種不一樣的同步方式。除了Java標準庫提供的同步方式以外,程序中特有的同步方式須要由開發人員本身來實現。

常見的一種需求是 對有限個共享資源的訪問,好比多臺我的電腦,2臺打印機,當多個線程在等待同一個資源時,從公平角度出發,會用FIFO隊列。

若是程序中的同步方式能夠抽象成對有限個資源的訪問,那麼可使用java.util.concurrent.locks包中的AbstractQueuedSynchronizer類和AbstractQueuedLongSynchronizer類做爲實現的基礎,前者用int類型的變量來維護內部狀態,然後者用long類型。(能夠將這個變量理解爲共享資源個數)

經過getState、setState、和compareAndSetState3個方法更新內部變量的值。

AbstractQueuedSynchronizer類是abstract的,須要覆蓋其中包含的部分方法,一般作法是把其做爲一個Java類的內部類,外部類提供具體的同步方式,內部類則做爲實現的基礎。有兩種模式,排他模式和共享模式,分別對應方法 tryAcquire()、tryRelease 和 tryAcquireShared、tryReleaseShared,在這些方法中,使用getState、setState、compareAndSetState3個方法來修改內部變量的值,以此來反應資源的狀態。

[java] view plain copy

  1. public class SimpleResourceManager{

  2. private final InnerSynchronizer synchronizer;

  3. private static class InnerSynchronizer extends AbstractQueuedSynchronizer{

  4. InnerSynchronizer(int numOfResources){

  5. setState(numOfResources);

  6. }

  7. protected int tryAcquireShared(int acquires){

  8. for(;;){

  9. int available = getState();

  10. int remain = available - acquires;

  11. if(remain <0 || comapreAndSetState(available, remain){

  12. return remain;

  13. }

  14. }

  15. }

  16. protected boolean try ReleaseShared(int releases){

  17. for(;;){

  18. int available = getState();

  19. int next = available + releases;

  20. if(compareAndSetState(available,next){

  21. return true;

  22. }

  23. }

  24. }

  25. }

  26. public SimpleResourceManager(int numOfResources){

  27. synchronizer = new InnerSynchronizer(numOfResources);

  28. }

  29. public void acquire() throws InterruptedException{

  30. synchronizer.acquireSharedInterruptibly(1);

  31. }

  32. pubic void release(){

  33. synchronizer.releaseShared(1);

  34. }

  35. }


public class SimpleResourceManager{



   private final InnerSynchronizer synchronizer;



   private static class InnerSynchronizer extends AbstractQueuedSynchronizer{



     InnerSynchronizer(int numOfResources){



         setState(numOfResources);



     }



     protected int tryAcquireShared(int acquires){



         for(;;){



           int available = getState();



           int remain = available - acquires;



           if(remain <0 || comapreAndSetState(available, remain){



               return remain;



           }



         }



     }



     protected boolean try ReleaseShared(int releases){



         for(;;){



           int available = getState();



           int next = available + releases;



           if(compareAndSetState(available,next){



               return true;



           }



         }



     }



   }



   public SimpleResourceManager(int numOfResources){



     synchronizer = new InnerSynchronizer(numOfResources);



   }



   public void acquire() throws InterruptedException{



     synchronizer.acquireSharedInterruptibly(1);



   }     



   pubic void release(){   



     synchronizer.releaseShared(1);



   }



}

7、高級同步對象(提升開發效率)

atomic和locks包提供的Java類能夠知足基本的互斥和同步訪問的需求,但這些Java類的抽象層次較低,使用比較複雜。

更簡單的作法是使用java.util.concurrent包中的高級同步對象。

一、信號量。

信號量通常用來數量有限的資源,每類資源有一個對象的信號量,信號量的值表示資源的可用數量。

在使用資源時,須要從該信號量上獲取許可,成功獲取許可,資源的可用數-1;完成對資源的使用,釋放許可,資源可用數+1; 當資源數爲0時,須要獲取資源的線程以阻塞的方式來等待資源,或過段時間以後再來檢查資源是否可用。(上面的SimpleResourceManager類實際上時信號量的一個簡單實現)

java.util.concurrent.Semaphore類,在建立Semaphore類的對象時指定資源的可用數

a、acquire(),以阻塞方式獲取許可

b、tryAcquire(),以非阻塞方式獲取許可

c、release(),釋放許可。

d、accquireUninterruptibly(),accquire()方法獲取許能夠的過程能夠被中斷,若是不但願被中斷,使用此方法。

[java] view plain copy

  1. public class PrinterManager{

  2. private final Semphore semaphore;

  3. private final List<Printer> printers = new ArrayList<>():

  4. public PrinterManager(Collection<? extends Printer> printers){

  5. this.printers.addAll(printers);

  6. //這裏重載方法,第二個參數爲true,以公平競爭模式,防止線程飢餓

  7. this.semaphore = new Semaphore(this.printers.size(),true);

  8. }

  9. public Printer acquirePrinter() throws InterruptedException{

  10. semaphore.acquire();

  11. return getAvailablePrinter();

  12. }

  13. public void releasePrinter(Printer printer){

  14. putBackPrinter(pinter);

  15. semaphore.release();

  16. }

  17. private synchronized Printer getAvailablePrinter(){

  18. printer result = printers.get(0);

  19. printers.remove(0);

  20. return result;

  21. }

  22. private synchronized void putBackPrinter(Printer printer){

  23. printers.add(printer);

  24. }

  25. }


public class PrinterManager{



   private final Semphore semaphore;



   private final List<Printer> printers = new ArrayList<>():



   public PrinterManager(Collection<? extends Printer> printers){



     this.printers.addAll(printers);



     //這裏重載方法,第二個參數爲true,以公平競爭模式,防止線程飢餓



     this.semaphore = new Semaphore(this.printers.size(),true);



   }



   public Printer acquirePrinter() throws InterruptedException{



     semaphore.acquire();



     return getAvailablePrinter();



   }



   public void releasePrinter(Printer printer){



     putBackPrinter(pinter);



     semaphore.release();



   }



   private synchronized Printer getAvailablePrinter(){



     printer result = printers.get(0);



     printers.remove(0);



     return result;



   }



   private synchronized void putBackPrinter(Printer printer){



     printers.add(printer);



   }



}

二、倒數閘門

多線程協做時,一個線程等待另外的線程完成任務才能繼續進行。

java.util.concurrent.CountDownLatch類,建立該類時,指定等待完成的任務數;當一個任務完成,調用countDonw(),任務數-1。等待任務完成的線程經過await(),進入阻塞狀態,直到任務數量爲0。CountDownLatch類爲一次性,一旦任務數爲0,再調用await()再也不阻塞當前線程,直接返回。

例:

[java] view plain copy

  1. public class PageSizeSorter{

  2. // 併發性能遠遠優於HashTable的 Map實現,hashTable作任何操做都須要得到鎖,同一時間只有有個線程能使用,而ConcurrentHashMap是分段加鎖,不一樣線程訪問不一樣的數據段,徹底不受影響,忘記HashTable吧。

  3. private static final ConcurrentHashMap<String , Interger> sizeMap = new ConcurrentHashMap<>();

  4. private static class GetSizeWorker implements Runnable{

  5. private final String urlString;

  6. public GetSizeWorker(String urlString , CountDownLatch signal){

  7. this.urlString = urlStirng;

  8. this.signal = signal;

  9. }

  10. public void run(){

  11. try{

  12. InputStream is = new URL(urlString).openStream();

  13. int size = IOUtils.toByteArray(is).length;

  14. sizeMap.put(urlString, size);

  15. }catch(IOException e){

  16. sizeMap.put(urlString, -1);

  17. }finally{

  18. signal.countDown()://完成一個任務 , 任務數-1

  19. }

  20. }

  21. }

  22. private void sort(){

  23. List<Entry<String, Integer> list = new ArrayList<sizeMap.entrySet());

  24. Collections.slort(list, new Comparator<Entry<String,Integer>>(){

  25. public int compare (Entry<String, Integer> o1, Entry<Sting , Integer> o2){

  26. return Integer.compare(o2.getValue(),o1.getValue());

  27. };

  28. System.out.println(Arrays.deepToString(list.toArray()));

  29. }

  30. public void sortPageSize(Collection<String> urls) throws InterruptedException{

  31. CountDownLatch sortSignal = new CountDownLatch(urls.size());

  32. for(String url: urls){

  33. new Thread(new GetSizeWorker(url, sortSignal)).start();

  34. }

  35. sortSignal.await()://主線程在這裏等待,任務數歸0,則繼續執行

  36. sort();

  37. }

  38. }


public class PageSizeSorter{



   // 併發性能遠遠優於HashTable的 Map實現,hashTable作任何操做都須要得到鎖,同一時間只有有個線程能使用,而ConcurrentHashMap是分段加鎖,不一樣線程訪問不一樣的數據段,徹底不受影響,忘記HashTable吧。



   private static final ConcurrentHashMap<String , Interger> sizeMap = new ConcurrentHashMap<>();



   private static class GetSizeWorker implements Runnable{



     private final String urlString;



     public GetSizeWorker(String urlString , CountDownLatch signal){



         this.urlString = urlStirng;



         this.signal = signal;



     }



     public void run(){



         try{



           InputStream is = new URL(urlString).openStream();



           int size = IOUtils.toByteArray(is).length;



           sizeMap.put(urlString, size);



         }catch(IOException e){



           sizeMap.put(urlString, -1);



         }finally{



           signal.countDown()://完成一個任務 , 任務數-1



         }



     }



   }



   private void sort(){



     List<Entry<String, Integer> list = new ArrayList<sizeMap.entrySet());



     Collections.slort(list, new Comparator<Entry<String,Integer>>(){



         public int compare (Entry<String, Integer> o1, Entry<Sting , Integer> o2){



           return Integer.compare(o2.getValue(),o1.getValue());



     };



     System.out.println(Arrays.deepToString(list.toArray()));



   }



   public void sortPageSize(Collection<String> urls) throws InterruptedException{



     CountDownLatch sortSignal = new CountDownLatch(urls.size());



     for(String url: urls){



         new Thread(new GetSizeWorker(url, sortSignal)).start();



     }



     sortSignal.await()://主線程在這裏等待,任務數歸0,則繼續執行



     sort();



   }



}

三、循環屏障

循環屏障在做用上相似倒數閘門,不過他不像倒數閘門是一次性的,能夠循環使用。另外,線程之間是互相平等的,彼此都須要等待對方完成,當一個線程完成本身的任務以後,等待其餘線程完成。當全部線程都完成任務以後,全部線程才能夠繼續運行。

當線程之間須要再次進行互相等待時,能夠複用同一個循環屏障。

類java.uti.concurrent.CyclicBarrier用來表示循環屏障,建立時指定使用該對象的線程數目,還能夠指定一個Runnable接口的對象做爲每次循環後執行的動做。(當最後一個線程完成任務以後,全部線程繼續執行以前,被執行。若是線程之間須要更新一些共享的內部狀態,能夠利用這個Runnalbe接口的對象來處理)。

每一個線程任務完成以後,經過調用await方法進行等待,當全部線程都調用await方法以後,處於等待狀態的線程均可以繼續執行。在全部線程中,只要有一個在等待中被中斷,超時或是其餘錯誤,整個循環屏障會失敗,全部等待中的其餘線程拋出java.uti.concurrent.BrokenBarrierException。

例:每一個線程負責找一個數字區間的質數,當全部線程完成後,若是質數數目不夠,繼續擴大範圍查找

[java] view plain copy

  1. public class PrimeNumber{

  2. private static final int TOTAL_COUTN = 5000;

  3. private static final int RANGE_LENGTH= 200;

  4. private static final int WORKER_NUMBER = 5;

  5. private static volatitle boolean done = false;

  6. private static int rangeCount = 0;

  7. private static final List<Long> results = new ArrayList<Long>():

  8. private static final CyclicBarrier barrier = new CyclicBarrier(WORKER_NUMBER, new Runnable(){

  9. public void run(){

  10. if(results.size() >= TOTAL_COUNT){

  11. done = true;

  12. }

  13. }

  14. });

  15. private static class PrimeFinder implements Runnable{

  16. public void run(){

  17. while(!done){// 整個過程在一個 while循環下,await()等待,下次循環開始,會再次判斷 執行條件

  18. int range = getNextRange();

  19. long start = rang * RANGE_LENGTH;

  20. long end = (range + 1) * RANGE_LENGTH;

  21. for(long i = start; i<end;i++){

  22. if(isPrime(i)){

  23. updateResult(i);

  24. }

  25. }

  26. try{

  27. barrier.await();

  28. }catch (InterruptedException | BokenBarrierException e){

  29. done = true;

  30. }

  31. }

  32. }

  33. }

  34. private synchronized static void updateResult(long value){

  35. results.add(value);

  36. }

  37. private synchronized static int getNextRange(){

  38. return rangeCount++;

  39. }

  40. private static boolean isPrime(long number){

  41. //找質數的代碼

  42. }

  43. public void calculate(){

  44. for(int i=0;i<WORKER_NUMBER;i++){

  45. new Thread(new PrimeFinder()).start();

  46. }

  47. while(!done){

  48. }

  49. //計算完成

  50. }

  51. }


public class PrimeNumber{



   private static final int TOTAL_COUTN = 5000;



   private static final int RANGE_LENGTH= 200;



   private static final int WORKER_NUMBER = 5;



   private static volatitle boolean done = false;



   private static int rangeCount = 0;



   private static final List<Long> results = new ArrayList<Long>():



   private static final CyclicBarrier barrier = new CyclicBarrier(WORKER_NUMBER, new Runnable(){



     public void run(){



         if(results.size() >= TOTAL_COUNT){



           done = true;



         }



     }



   });



   private static class PrimeFinder implements Runnable{



     public void run(){



         while(!done){// 整個過程在一個 while循環下,await()等待,下次循環開始,會再次判斷 執行條件



           int range = getNextRange();



           long start = rang * RANGE_LENGTH;



           long end = (range + 1) * RANGE_LENGTH;



           for(long i = start; i<end;i++){



               if(isPrime(i)){



                 updateResult(i);



               }



           }



           try{



               barrier.await();



           }catch (InterruptedException | BokenBarrierException e){



               done = true;



           }



         }



     }



   }



   private synchronized static void updateResult(long value){



     results.add(value);



   }



   private synchronized static int getNextRange(){



     return rangeCount++;



   }



   private static boolean isPrime(long number){



     //找質數的代碼



   }



   public void calculate(){



     for(int i=0;i<WORKER_NUMBER;i++){



         new Thread(new PrimeFinder()).start();



     }



     while(!done){







     }



     //計算完成



   }



}

四、對象交換器

適合於兩個線程須要進行數據交換的場景。(一個線程完成後,把結果交給另外一個線程繼續處理)

java.util.concurrent.Exchanger類,提供了這種對象交換能力,兩個線程共享一個Exchanger類的對象,一個線程完成對數據的處理以後,調用Exchanger類的exchange()方法把處理以後的數據做爲參數發送給另一個線程。而exchange方法的返回結果是另一個線程鎖提供的相同類型的對象。若是另一個線程未完成對數據的處理,那麼exchange()會使當前線程進入等待狀態,直到另一個線程也調用了exchange方法來進行數據交換。

例:

[java] view plain copy

  1. public class SendAndReceiver{

  2. private final Exchanger<StringBuilder> exchanger = new Exchanger<StringBuilder>();

  3. private class Sender implements Runnable{

  4. public void run(){

  5. try{

  6. StringBuilder content = new StringBuilder("Hello");

  7. content = exchanger.exchange(content);

  8. }catch(InterruptedException e){

  9. Thread.currentThread().interrupt();

  10. }

  11. }

  12. }

  13. private class Receiver implements Runnable{

  14. public void run(){

  15. try{

  16. StringBuilder content = new StringBuilder("World");

  17. content = exchanger.exchange(content);

  18. }catch(InterruptedException e){

  19. Thread.currentThread().interrupt();

  20. }

  21. }

  22. }

  23. public void exchange(){

  24. new Thread(new Sender()).start();

  25. new Thread(new Receiver()).start();

  26. }

  27. }


public class SendAndReceiver{



   private final Exchanger<StringBuilder> exchanger = new Exchanger<StringBuilder>();



   private class Sender implements Runnable{



     public void run(){



         try{



           StringBuilder content = new StringBuilder("Hello");



           content = exchanger.exchange(content);



         }catch(InterruptedException e){



           Thread.currentThread().interrupt();



         }



     }



   }



   private class Receiver implements Runnable{



     public void run(){



         try{



           StringBuilder content = new StringBuilder("World");



           content = exchanger.exchange(content);



         }catch(InterruptedException e){



           Thread.currentThread().interrupt();



         }



     }



   }



   public void exchange(){



     new Thread(new Sender()).start();



     new Thread(new Receiver()).start();



   }



}

8、數據結構(多線程程序使用的高性能數據結構)

java.util.concurrent包中提供了一些適合多線程程序使用的高性能數據結構,包括隊列和集合類對象等。

一、隊列

a、BlockingQueue接口:線程安全的阻塞式隊列;當隊列已滿時,想隊列添加會阻塞;當隊列空時,取數據會阻塞。(很是適合消費者-生產者模式)

阻塞方式:put()、take()。

非阻塞方式:offer()、poll()。

實現類:基於數組的固定元素個數的ArrayBolockingQueue和基於鏈表結構的不固定元素個數的LinkedBlockQueue類。

b、BlockingDeque接口: 與BlockingQueue類似,但能夠對頭尾進行添加和刪除操做的雙向隊列;方法分爲兩類,分別在隊首和對尾進行操做。

實現類:標準庫值提供了一個基於鏈表的實現,LinkedBlockgingDeque

二、集合類

在多線程程序中,若是共享變量時集合類的對象,則不適合直接使用java.util包中的集合類。這些類要麼不是線程安全,要麼在多線程下性能比較差。

應該使用java.util.concurrent包中的集合類。

a、ConcurrentMap接口: 繼承自java.util.Map接口

putIfAbsent():只有在散列表不包含給定鍵時,纔會把給定的值放入。

remove():刪除條目。

replace(key,value):把value 替換到給定的key上。

replace(key, oldvalue, newvalue):CAS的實現。

實現類:ConcurrentHashMap

建立時,若是能夠預估可能包含的條目個數,能夠優化性能。(由於動態調整所能包含的數目操做比較耗時,這個HashMap也同樣,只是多線程下更耗時)。

建立時,預估進行更新操做的線程數,這樣實現中會根據這個數把內部空間劃分爲對應數量的部分。(默認是16,若是隻有一個線程進行寫操做,其餘都是讀取,那麼把值設爲1 能夠提升性能)。

注:當從集合中建立出迭代器遍歷Map元素時,不必定能看到正在添加的數據,只能和集合保證弱一致性。(固然使用迭代器不會由於查看正在改變的Map,而拋出java.util.ConcurrentModifycationException)

b、CopyOnWriteArrayList接口:繼承自java.util.List接口。

顧名思義,在CopyOnWriteArrayList的實現類,全部對列表的更新操做都會新建立一個底層數組的副本,並使用副原本存儲數據;對列表更新操做加鎖,讀取操做不加鎖。

適合多讀取少修改的場景,若是更新操做多,那麼不適合用,一樣迭代器只能表示建立時列表的狀態,更新後使用了新的底層數組,迭代器仍是引用舊的底層數組。

9、多線程任務的執行

過去線程的執行,是先建立Thread類的想,再調用start方法啓動,這種作法要求開發人員對線程進行維護,在線程較多時,通常建立一個線程池同一管理,同時下降重複建立線程的開銷

在J2SE5.0中,java.util.concurrent包提供了豐富的用來管理線程和執行任務的實現。

一、基本接口(描述任務)

a、Callable接口:

Runnable接口受限於run方法的類型簽名,而Callable只有一個方法call(),能夠有返回值,能夠拋出受檢異常。

b、Future接口:

過去,須要異步線程的任務執行結果,要求主線程和任務執行線程之間進行同步和數據傳遞。

Future簡化了任務的異步執行,做爲異步操做的一個抽象。調用get()方法能夠獲取異步的執行結果,若是任務沒有執行完,會等待,直到任務完成或被取消,cancel()能夠取消。

c、Delayed接口:

延遲執行任務,getDelay()返回當前剩餘的延遲時間,若是不大於0,說明延遲時間已通過去,應該調度並執行該任務。

二、組合接口(描述任務)

a、RunnableFuture接口:繼承自Runnable接口和Future接口。

當來自Runnalbe接口中的run方法成功執行以後,至關於Future接口表示的異步任務已經完成,能夠經過get()獲取運行結果。

b、ScheduledFuture接口:繼承Future接口和Delayed接口,表示一個能夠調用的異步操做。

c、RunnableScheduledFuture接口:繼承自Runnable、Delayed和Future,接口中包含isPeriodic,代表該異步操做是否能夠被重複執行。

三、Executor接口、ExcutorServer接口、ScheduleExecutorService接口和CompletionService接口(描述任務執行)

a、executor接口,execute()用來執行一個Runnable接口的實現對象,不一樣的Executor實現採起不一樣執行策略,但提供的任務執行功能比較弱。

b、excutorServer接口,繼承自executor;

提供了對任務的管理:submit(),能夠吧Callable和Runnable做爲任務提交,獲得一個Future做爲返回,能夠獲取任務結果或取消任務。

提供批量執行:invokeAll()和invokeAny(),同時提交多個Callable;invokeAll(),會等待全部任務都執行完成,返回一個包含每一個任務對應Future的列表;invokeAny(),任何一個任務成功完成,即返回該任務結果。

提供任務關閉:shutdown()、shutdownNow()來關閉服務,前者不容許新的任務提交,後者試圖終止正在運行和等待的任務,並返回已經提交單沒有被運行的任務列表。(兩個方法都不會等待服務真正關閉,只是發出關閉請求。)。shutdownDow,一般作法是向線程發出中斷請求,因此確保提交的任務實現了正確的中斷處理邏輯。

c、ScheduleExecutorService接口,繼承自excutorServer接口:支持任務的延遲執行和按期執行,能夠執行Callable或Runnable。

schedule(),調度一個任務在延遲若干時間以後執行;

scheduleAtFixedRate():在初始延遲後,每隔一段時間循環執行;在下一次執行開始時,上一次執行可能還未結束。(同一時間,可能有多個)

scheduleWithFixedDelay:同上,只是在上一次任務執行完後,通過給定的間隔時間再開始下一次執行。(同一時間,只有一個)

以上三個方法都返回ScheduledFuture接口的實現對象。

d、CompletionService接口,共享任務執行結果。

一般在使用ExecutorService接口,經過submit提交任務,並獲得一個Future接口來獲取任務結果,若是任務提交者和執行結果的使用者是程序的不一樣部分,那就要把Future在不一樣部分進行傳遞;而CompletionService就是解決這個問題,程序不一樣部分能夠共享CompletionService,任務提交後,執行結果能夠經過take(阻塞),poll(非阻塞)來獲取。

標準庫提供的實現是 ExecutorCompletionService,在建立時,須要提供一個Executor接口的實現做爲參數,用來實際執行任務。

例:多線程方式下載文件

[java] view plain copy

  1. public class FileDownloader{

  2. // 線程池

  3. private final ExecutorService executor = Executors.newFixedThreadPool(10);

  4. public boolean download(final URL url, final Path path){

  5. Future<Path> future = executor.submit(new Callable<Path>(){ //submit提交任務

  6. public Path call(){

  7. //這裏就省略IOException的處理了

  8. InputStream is = url.openStream();

  9. Files.copy(is, path, StandardCopyOption.REPLACE_EXISTING);

  10. return path;

  11. });

  12. try{

  13. return future.get() !=null ? true : false;

  14. }<span style="font-family: Arial, Helvetica, sans-serif;">catch(InterruptedException | ExecutionException e){</span>

  15. return false;

  16. }

  17. }

  18. public void close(){//當再也不使用FileDownloader類的對象時,應該使用close方法關閉其中包含的ExecutorService接口的實現對象,不然虛擬機不會退出,佔用內存不釋放

  19. executor.shutdown();// 發出關閉請求,此時不會再接受新任務

  20. try{

  21. if(!executor.awaitTermination(3, TimeUnit.MINUTES)){// awaitTermination 來等待一段時間,使正在執行的任務或等待的任務有機會完成

  22. executor.shutdownNow();// 若是等待時間事後還有任務沒完成,則強制結束

  23. executor.awaitTermination(1, TimeUnit.MINUTES);// 再等待一段時間,使被強制結束的任務完成必要的清理工做

  24. }

  25. }catch(InterruptedException e){

  26. executor.shutdownNow();

  27. Thread.currentThread().interrupt();

  28. }

  29. }

  30. }


public class FileDownloader{



   // 線程池



   private final ExecutorService executor = Executors.newFixedThreadPool(10);



   public boolean download(final URL url, final Path path){



   Future<Path> future = executor.submit(new Callable<Path>(){ //submit提交任務



     public Path call(){



         //這裏就省略IOException的處理了



         InputStream is = url.openStream();



         Files.copy(is, path, StandardCopyOption.REPLACE_EXISTING);



         return path;



     });



     try{



         return future.get() !=null ? true : false;



     }<span style="font-family: Arial, Helvetica, sans-serif;">catch(InterruptedException | ExecutionException e){</span>



         return false;



     }



   }



   public void close(){//當再也不使用FileDownloader類的對象時,應該使用close方法關閉其中包含的ExecutorService接口的實現對象,不然虛擬機不會退出,佔用內存不釋放



     executor.shutdown();// 發出關閉請求,此時不會再接受新任務



     try{



         if(!executor.awaitTermination(3, TimeUnit.MINUTES)){// awaitTermination 來等待一段時間,使正在執行的任務或等待的任務有機會完成



           executor.shutdownNow();// 若是等待時間事後還有任務沒完成,則強制結束



           executor.awaitTermination(1, TimeUnit.MINUTES);// 再等待一段時間,使被強制結束的任務完成必要的清理工做



         }



     }catch(InterruptedException e){



         executor.shutdownNow();



         Thread.currentThread().interrupt();



     }



   }



}

10、Java SE 7 新特性

對java.util.concurrent包進行更新,增長了新的輕量級任務執行框架fork/join和多階段線程同步工具。

一、輕量級任務執行框架fork/join

這個框架的目的主要是更好地利用底層平臺上的多核和多處理器來進行並行處理。

經過分治算法或map/reduce算法來解決問題。

fork/join 類比於 map/reduce。

fork操做是把一個大的問題劃分爲若干個較小的問題,劃分過程通常爲遞歸,直到能夠直接進行計算的粒度適合的子問題;子問題在結算後,能夠獲得整個問題的部分解

join操做收集子結果,合併,獲得完整解,也多是 遞歸進行的。

相對通常的線程池實現,F/J框架的優點在任務的處理方式上。在通常線程池中,一個線程因爲某些緣由沒法運行,會等待;而在F/J,某個子問題因爲等待另一個子問題的完成而沒法繼續運行,那麼處理該子問題的線程會主動尋找其餘還沒有運行的子問題來執行。這種方式減小了等待時間,提升了性能。

爲了F/J能高效,在每一個子問題視線中應避免使用synchronized或其餘方式進行同步,也不該使用阻塞式IO或過多訪問共享變量。在理想狀況下,每一個子問題都應值進行CPU計算,只使用每一個問題的內部對象,惟一的同步應只發生在子問題和建立它的父問題之間。(這徹底就是Hadoop的MapReduce嘛)

a、ForkJoinTask類:表示一個由F/J框架執行的任務,該類實現了Future接口,能夠按照Future接口的方式來使用。(表示任務)

fork(),異步方式啓動任務的執行。

join(),等待任務完成並返回執行結果。

在建立本身的任務時,最好不要直接繼承自ForkJoinTask,而是繼承其子類,RecuriveTask或RecursiveAction,前者能夠返回結果,後者不行。

b、ForkJoinPool類:表示任務執行,實現了ExecutorService接口,除了能夠執行ForkJoinTask,也能夠執行Callable和Runnable。(任務執行)

執行任務的兩大類:

第一類:execute、invoke或submit方法:直接提交任務。

第二類:fork():運行ForkJoinTask在執行過程當中的子任務。

通常做法是表示整個問題的ForkJoinTask用第一類提交,執行過程當中產生的子任務不須要處理,ForkJoinPool會負責子任務執行。

例:查找數組中的最大值

[java] view plain copy

  1. private static class MaxValueTask extends RecursiveTask<Long>{

  2. private final long[] array;

  3. private final int start;

  4. private final int end;

  5. MaxValueTask(long[] array, int start, int end){

  6. this.array = array;

  7. this.start = start;

  8. this.end = end;

  9. }

  10. //compute是RecursiveTask的主方法

  11. protected long compute(){

  12. long max = Long.MIN_VALUE;

  13. if(end - start < RANG_LENGTH){//尋找最大值

  14. for(int i = start; i<end;i++{

  15. if(array[i] > max){

  16. max = array[i];

  17. }

  18. }

  19. }else{// 二分任務

  20. int mid = (start + end) /2;

  21. MaxValueTask lowTask = new MaxValueTask(array, start , mid);

  22. MaxValueTask highTask = new MaxValueTask(array, mid, end);

  23. lowTask.fork();// 異步啓動任務

  24. highTask.fork();

  25. max = Math.max(max, lowTask.join());//等待執行結果

  26. max = Math.max(max, highTask.join();

  27. }

  28. return max;

  29. }

  30. public Long calculate(long[] array){

  31. MaxValueTask task = new MaxValueTask(array, 0 , array.length);

  32. Long result = forkJoinPool.invoke(task);

  33. return result;

  34. }

  35. }


private static class MaxValueTask extends RecursiveTask<Long>{



   private final long[] array;



   private final int start;



   private final int end;



   MaxValueTask(long[] array, int start, int end){



     this.array = array;



     this.start = start;



     this.end = end;



   }



   //compute是RecursiveTask的主方法



   protected long compute(){



     long max = Long.MIN_VALUE;



     if(end - start < RANG_LENGTH){//尋找最大值



         for(int i = start; i<end;i++{



           if(array[i] > max){



               max = array[i];



           }



         }



     }else{// 二分任務



         int mid = (start + end) /2;



         MaxValueTask lowTask = new MaxValueTask(array, start , mid);



         MaxValueTask highTask = new MaxValueTask(array, mid, end);



         lowTask.fork();// 異步啓動任務



         highTask.fork();



         max = Math.max(max, lowTask.join());//等待執行結果



         max = Math.max(max, highTask.join();



     }



     return max;



   }



   public Long calculate(long[] array){



     MaxValueTask task = new MaxValueTask(array, 0 , array.length);



     Long result = forkJoinPool.invoke(task);



     return result;



   }



}

注:這個例子是示例,但從性能上說直接對整個數組順序比較效率高,畢竟多線程所帶來的額外開銷過大。

在實際中,F/J框架發揮做用的場合不少,好比在一個目錄包含的全部文本中搜索某個關鍵字,能夠每一個文件建立一個子任務。

若是相關的功能能夠用遞歸和分治來解決,就適合F/J。

二、多階段線程同步工具

Phaser類是Java SE 7中新增的一個使用同步工具,功能和靈活性比倒數閘門和循環屏障要強不少。

在F/J框架中的子任務之間要進行同步時,應優先考慮Phaser。

Phaser把多個線程寫做執行的任務劃分紅多個階段(phase),編程時要明確各個階段的任務,每一個階段均可以有任意個參與者,線程能夠隨時註冊並參與到某個階段,當一個階段中全部線程都成功完成以後,Phaser的onAdvance()被調用,能夠經過覆蓋添加自定義處理邏輯(相似循環屏障的使用的Runnable接口),而後Phaser類會自動進入下個階段。如此循環,知道Phaser再也不包含任何參與者。

Phaser建立後,初始階段編號爲0,構造函數中指定初始參與個數。

register(),bulkRegister(),動態添加一個或多個參與者。

arrive(),某個參與者完成任務後調用

arriveAndDeregister(),任務完成,取消本身的註冊。

arriveAndAwaitAdvance(),本身完成等待其餘參與者完成。,進入阻塞,直到Phaser成功進入下個階段。

awaitAdvance()、awaitAdvanceInterruptibly(),等待phaser進入下個階段,參數爲當前階段的編號,後者能夠設置超時和處理中斷請求。

另外,Phaser的一個重要特徵是多個Phaser能夠組成樹形結構,Phaser提供了構造方法來指定當前對象的父對象;當一個子對象參與者>0,會自動註冊到父對象中;當=0,自動解除註冊。

例:從指定網址,下載img標籤的照片

階段一、處理網址對應的html文本,和抽取img的連接;二、建立圖片下載子線程,主線程等待;三、子線程下載圖片,主線程等待;四、任務完成退出

[java] view plain copy

  1. public class WebPageImageDownloader{

  2. private final Phaser phaser = new Phaser(1);//初始參與數1,表明主線程。

  3. public void download(URL url, final Path path) throws IOException{

  4. String content = getContent(url);//得到HTML文本,省略。

  5. List<URL> imageUrls = extractImageUrls(content);//得到圖片連接,省略。

  6. for(final URL imageUrl : imageUrls){

  7. phaser.register();//子線程註冊

  8. new Thread(){

  9. public void run(){

  10. phaser.arriveAndAwaitAdvance();//第二階段的等待,等待進入第三階段

  11. try{

  12. InputStream is = imageUrl.openStream();

  13. File.copy(is, getSavePath(path, imageUrl), StandardCopyOption.REPLACE_EXISTING);

  14. }catch(IOException e){

  15. e.printStackTrace():

  16. }finally{

  17. phaser.arriveAndDeregister();//子線程完成任務,退出。

  18. }

  19. }

  20. }.start();

  21. }

  22. phaser.arriveAndAwaitAdvance();//第二階段等待,子線程在註冊

  23. phaser.arriveAndAwaitAdvance();//第三階段等待,子線程在下載

  24. phaser.arriveAndDeregister();//全部線程退出。

  25. }

  26. }


public class WebPageImageDownloader{



   private final Phaser phaser = new Phaser(1);//初始參與數1,表明主線程。



   public void download(URL url, final Path path) throws IOException{



     String content = getContent(url);//得到HTML文本,省略。



     List<URL> imageUrls = extractImageUrls(content);//得到圖片連接,省略。



     for(final URL imageUrl : imageUrls){



         phaser.register();//子線程註冊



         new Thread(){



           public void run(){



               phaser.arriveAndAwaitAdvance();//第二階段的等待,等待進入第三階段



               try{



                 InputStream is = imageUrl.openStream();



                 File.copy(is, getSavePath(path, imageUrl), StandardCopyOption.REPLACE_EXISTING);



               }catch(IOException e){



                 e.printStackTrace():



               }finally{



                 phaser.arriveAndDeregister();//子線程完成任務,退出。



               }



           }



       }.start();



     }



     phaser.arriveAndAwaitAdvance();//第二階段等待,子線程在註冊



     phaser.arriveAndAwaitAdvance();//第三階段等待,子線程在下載



     phaser.arriveAndDeregister();//全部線程退出。



   }



}

11、ThreadLocal類

java.lang.ThreadLocal,線程局部變量,把一個共享變量變爲一個線程的私有對象。不一樣線程訪問一個ThreadLocal類的對象時,鎖訪問和修改的事每一個線程變量各自獨立的對象。經過ThreadLocal能夠快速把一個非線程安全的對象轉換成線程安全的對象。(同時也就不能達到數據傳遞的做用了)。

a、get()和set()分別用來獲取和設置當前線程中包含的對象的值。

b、remove(),刪除。

c、initialValue(),初始化值。若是沒有經過set方法設置值,第一個調用get,會經過initValue來獲取對象的初始值。

ThreadLoacl的通常用法,建立一個ThreadLocal的匿名子類並覆蓋initalValue(),把ThreadLoacl的使用封裝在另外一個類中

[java] view plain copy

  1. public class ThreadLocalIdGenerator{

  2. private static final ThreadLocal<IdGenerator> idGenerator = new ThreadLocal<IdGenerator>(){

  3. protected IdGenerator initalValue(){

  4. return new IdGenerator();//IdGenerator 是個初始int value =0,而後getNext(){ return value++}

  5. }

  6. };

  7. public static int getNext(){

  8. return idGenerator.get().getNext();

  9. }

  10. }


public class ThreadLocalIdGenerator{



   private static final ThreadLocal<IdGenerator> idGenerator = new ThreadLocal<IdGenerator>(){



         protected IdGenerator initalValue(){



           return new IdGenerator();//IdGenerator 是個初始int value =0,而後getNext(){ return value++}



         }



     };



   public static int getNext(){



     return idGenerator.get().getNext();



   }



}

ThreadLoal的另一個做用是建立線程惟一的對象,在有些狀況,一個對象在代碼中各個部分都須要用到,傳統作法是把這個對象做爲參數在代碼間傳遞,若是使用這個對I昂的代碼都在同一個線程,能夠封裝在ThreadLocal中。

如:在多線程中,生成隨機數

java.util.Random會帶來競爭問題,java.util.concurrent.ThreadLocalRandom類提供多線程下的隨機數聲場,底層是ThreadLoacl。

總結:多線程開發中應該優先使用高層API,若是沒法知足,使用java.util.concurrent.atomic和java.util.concurrent.locks包提供的中層API,而synchronized和volatile,以及wait,notify和notifyAll等低層API 應該最後考慮。

相關文章
相關標籤/搜索