本文同步至 http://www.waylau.com/essential-java-concurrencyhtml
計算機用戶想固然地認爲他們的系統在一個時間能夠作多件事。他們認爲,他們能夠工做在一個字處理器,而其餘應用程序在下載文件,管理打印隊列和音頻流。即便是單一的應用程序一般也是被指望在一個時間來作多件事。例如,音頻流應用程序必須同時讀取數字音頻,解壓,管理播放,並更新顯示。即便字處理器應該隨時準備響應鍵盤和鼠標事件,無論多麼繁忙,它老是能格式化文本或更新顯示。能夠作這樣的事情的軟件稱爲併發軟件(concurrent software)。java
在 Java 平臺是徹底支持併發編程。自從 5.0 版本以來,這個平臺還包括高級併發 API, 主要集中在 java.util.concurrent 包。c++
進程和線程是併發編程的兩個基本的執行單元。在 Java 中,併發編程主要涉及線程。git
一個計算機系統一般有許多活動的進程和線程。在給定的時間內,每一個處理器只能有一個線程獲得真正的運行。對於單核處理器來講,處理時間是經過時間切片來在進程和線程之間進行共享的。程序員
如今多核處理器或多進程的電腦系統愈來愈流行。這大大加強了系統的進程和線程的併發執行能力。但即使是沒有多處理器或多進程的系統中,併發仍然是可能的。github
進程有一個獨立的執行環境。進程一般有一個完整的、私人的基本運行時資源;特別是,每一個進程都有其本身的內存空間。面試
進程每每被視爲等同於程序或應用程序。然而,用戶將看到一個單獨的應用程序可能其實是一組合做的進程。大多數操做系統都支持進程間通訊( Inter Process Communication,簡稱 IPC)資源,如管道和套接字。IPC 不只用於同個系統的進程之間的通訊,也能夠用在不一樣系統的進程。算法
大多數 Java 虛擬機的實現做爲一個進程運行。Java 應用程序可使用 ProcessBuilder 對象建立額外的進程。多進程應用程序超出了本書的講解範圍。編程
線程有時被稱爲輕量級進程。進程和線程都提供一個執行環境,但建立一個新的線程比建立一個新的進程須要更少的資源。api
線程中存在於進程中,每一個進程都至少一個線程。線程共享進程的資源,包括內存和打開的文件。這使得工做變得高效,但也存在了一個潛在的問題——通訊。
多線程執行是 Java 平臺的一個重要特色。每一個應用程序都至少有一個線程,或者幾個,若是算上「系統」的線程(負責內存管理和信號處理)那就更多。但從程序員的角度來看,你啓動只有一個線程,稱爲主線程。這個線程有能力建立額外的線程。
每一個線程都與 Thread 類的一個實例相關聯。有兩種使用線程對象來建立併發應用程序的基本策略:
有兩種方式穿件 Thread 的實例:
public class HelloRunnable implements Runnable { /* (non-Javadoc) * @see java.lang.Runnable#run() */ @Override public void run() { System.out.println("Hello from a thread!"); } /** * @param args */ public static void main(String[] args) { (new Thread(new HelloRunnable())).start(); } }
public class HelloThread extends Thread { public void run() { System.out.println("Hello from a thread!"); } /** * @param args */ public static void main(String[] args) { (new HelloThread()).start(); } }
請注意,這兩個例子調用 start 來啓動線程。
第一種方式,它使用 Runnable 對象,在實際應用中更廣泛,由於 Runnable 對象能夠繼承 Thread 之外的類。第二種方式,在簡單的應用程序更容易使用,但受限於你的任務類必須是一個 Thread 的後代。本書推薦使用第一種方法,將 Runnable 任務從 Thread 對象分離來執行任務。這不只更靈活,並且它適用於高級線程管理 API。
Thread 類定義了大量的方法用於線程管理。
Thread.sleep 能夠當前線程執行暫停一個時間段,這樣處理器時間就能夠給其餘線程使用。
sleep 有兩種重載形式:一個是指定睡眠時間到毫秒,另一個是指定的睡眠時間爲納秒級。然而,這些睡眠時間不能保證是精確的,由於它們是經過由基礎 OS 提供的,並受其限制。此外,睡眠週期也能夠經過中斷終止,咱們將在後面的章節中看到。在任何狀況下,你不能假設調用 sleep 會掛起線程用於指定精確的時間段。
SleepMessages 示例使用 sleep 每隔4秒打印一次消息:
public class SleepMessages { /** * @param args */ public static void main(String[] args) throws InterruptedException { String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; for (int i = 0; i < importantInfo.length; i++) { // Pause for 4 seconds Thread.sleep(4000); // Print a message System.out.println(importantInfo[i]); } } }
請注意 main 聲明拋出 InterruptedException。當 sleep 是激活的時候,如有另外一個線程中斷當前線程時,則 sleep 拋出異常。因爲該應用程序尚未定義的另外一個線程來引發的中斷,因此考慮捕捉 InterruptedException。
中斷是代表一個線程,它應該中止它正在作和將要作事的時。線程經過在 Thread 對象調用 interrupt 來實現線程的中斷。爲了中斷機制能正常工做,被中斷的線程必須支持本身的中斷。
如何實現線程支持本身的中斷?這要看是什麼它目前正在作。若是線程頻繁調用拋出InterruptedException 的方法,它只要在 run 方法捕獲了異常以後返回便可。例如 :
for (int i = 0; i < importantInfo.length; i++) { // Pause for 4 seconds try { Thread.sleep(4000); } catch (InterruptedException e) { // We've been interrupted: no more messages. return; } // Print a message System.out.println(importantInfo[i]); }
不少方法都會拋出 InterruptedException,如 sleep,被設計成在收到中斷時當即取消他們當前的操做並返回。
若線程長時間沒有調用方法拋出 InterruptedException 的話,那麼它必須按期調用 Thread.interrupted ,在接收到中斷後返回 true。
for (int i = 0; i < inputs.length; i++) { heavyCrunch(inputs[i]); if (Thread.interrupted()) { // We've been interrupted: no more crunching. return; } }
在這個簡單的例子中,代碼簡單地測試該中斷,若是已接收到中斷線程就退出。在更復雜的應用程序,它可能會更有意義拋出一個 InterruptedException:
if (Thread.interrupted()) { throw new InterruptedException(); }
中斷機制是使用被稱爲中斷狀態的內部標誌實現的。調用 Thread.interrupt 能夠設置該標誌。當一個線程經過調用靜態方法 Thread.interrupted 檢查中斷,中斷狀態被清除。非靜態 isInterrupted 方法,它是用於線程來查詢另外一個線程的中斷狀態,不會改變中斷狀態標誌。
按照慣例,任何方法因拋出一個 InterruptedException 退出都會清除中斷狀態。固然,它可能由於另外一個線程調用 interrupt 而讓那個中斷狀態當即被從新設置。
join 方法容許一個線程等待另外一個完成。假設 t 是一個 Thread 對象,
t.join();
它會致使當前線程暫停執行直到 t 線程終止。join 容許程序員指定一個等待週期。與 sleep 同樣,等待時間是依賴於操做系統的時間,不能假設 join 等待時間是精確的。
像 sleep 同樣,join 響應中斷並經過 InterruptedException 退出。
SimpleThreads 示例,有兩個線程,第一個線程是每一個 Java 應用程序都有主線程。主線程建立的 Runnable 對象 MessageLoop,並等待它完成。若是 MessageLoop 須要很長時間才能完成,主線程就中斷它。
該 MessageLoop 線程打印出一系列消息。若是中斷以前就已經打印了全部消息,則 MessageLoop 線程打印一條消息並退出。
public class SimpleThreads { // Display a message, preceded by // the name of the current thread static void threadMessage(String message) { String threadName = Thread.currentThread().getName(); System.out.format("%s: %s%n", threadName, message); } private static class MessageLoop implements Runnable { public void run() { String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; try { for (int i = 0; i < importantInfo.length; i++) { // Pause for 4 seconds Thread.sleep(4000); // Print a message threadMessage(importantInfo[i]); } } catch (InterruptedException e) { threadMessage("I wasn't done!"); } } } public static void main(String args[]) throws InterruptedException { // Delay, in milliseconds before // we interrupt MessageLoop // thread (default one hour). long patience = 1000 * 60 * 60; // If command line argument // present, gives patience // in seconds. if (args.length > 0) { try { patience = Long.parseLong(args[0]) * 1000; } catch (NumberFormatException e) { System.err.println("Argument must be an integer."); System.exit(1); } } threadMessage("Starting MessageLoop thread"); long startTime = System.currentTimeMillis(); Thread t = new Thread(new MessageLoop()); t.start(); threadMessage("Waiting for MessageLoop thread to finish"); // loop until MessageLoop // thread exits while (t.isAlive()) { threadMessage("Still waiting..."); // Wait maximum of 1 second // for MessageLoop thread // to finish. t.join(1000); if (((System.currentTimeMillis() - startTime) > patience) && t.isAlive()) { threadMessage("Tired of waiting!"); t.interrupt(); // Shouldn't be long now // -- wait indefinitely t.join(); } } threadMessage("Finally!"); } }
線程間的通訊主要是經過共享訪問字段以及其字段所引用的對象來實現的。這種形式的通訊是很是有效的,但可能致使2種可能的錯誤:線程干擾(thread interference)和內存一致性錯誤(memory consistency errors)。同步就是要須要避免這些錯誤的工具。
可是,同步能夠引入線程競爭(thread contention),當兩個或多個線程試圖同時訪問相同的資源時,並致使了 Java 運行時執行一個或多個線程更慢,或甚至暫停他們的執行。飢餓(Starvation)和活鎖 (livelock) 是線程競爭的表現形式。
描述當多個線程訪問共享數據時是錯誤如何出現。
考慮下面的一個簡單的類 Counter:
public class Counter { private int c = 0; public void increment() { c++; } public void decrement() { c--; } public int value() { return c; } }
其中的 increment 方法用來對 c 加1;decrement 方法用來對 c 減 1。然而,有多個線程中都存在對某個 Counter 對象的引用,那麼線程間的干擾就可能致使出現咱們不想要的結果。
線程間的干擾出如今多個線程對同一個數據進行多個操做的時候,也就是出現了「交錯」。這就意味着操做是由多個步驟構成的,而此時,在這多個步驟的執行上出現了疊加。
Counter類對象的操做貌似不可能出現這種「交錯(interleave)」,由於其中的兩個關於c 的操做都很簡單,只有一條語句。然而,即便是一條語句也是會被虛擬機翻譯成多個步驟的。在這裏,咱們不深究虛擬機具體上上面的操做翻譯成了什麼樣的步驟。只須要知道即便簡單的 c++ 這樣的表達式也是會被翻譯成三個步驟的:
表達式 c-- 也是會被按照一樣的方式進行翻譯,只不過第二步變成了減1,而不是加1。
假定線程 A 中調用 increment 方法,線程 B 中調用 decrement 方法,而調用時間基本上相同。若是 c 的初始值爲 0,那麼這兩個操做的「交錯」順序可能以下:
這樣線程 A 計算的值就丟失了,也就是被線程 B 的值覆蓋了。上面的這種「交錯」只是其中的一種可能性。在不一樣的系統環境中,有多是 B 線程的結果丟失了,或者是根本就不會出現錯誤。因爲這種「交錯」是不可預測的,線程間相互干擾形成的 bug 是很難定位和修改的。
介紹了經過共享內存出現的不一致的錯誤。
內存一致性錯誤(Memory consistency errors)發生在不一樣線程對同一數據產生不一樣的「見解」。致使內存一致性錯誤的緣由很複雜,超出了本書的描述範圍。慶幸的是,程序員並不須要知道出現這些緣由的細節。咱們須要的是一種能夠避免這種錯誤的方法。
避免出現內存一致性錯誤的關鍵在於理解 happens-before 關係。這種關係是一種簡單的方法,可以確保一條語句對內存的寫操做對於其它特定的語句都是可見的。爲了理解這點,咱們能夠考慮以下的示例。假定定義了一個簡單的 int 類型的字段並對其進行了初始化:
int counter = 0;
該字段由兩個線程共享:A 和 B。假定線程 A 對 counter 進行了自增操做:
counter++;
而後,線程 B 打印 counter 的值:
System.out.println(counter);
若是以上兩條語句是在同一個線程中執行的,那麼輸出的結果天然是1。可是若是這兩條語句是在兩個不一樣的線程中,那麼輸出的結構有多是0。這是由於沒有保證線程 A 對 counter 的修改對線程 B 來講是可見的。除非程序員在這兩條語句間創建了必定的 happens-before 關係。
咱們能夠採起多種方式創建這種 happens-before 關係。使用同步就是其中之一,這點咱們將會在下面的小節中看到。
到目前爲止,咱們已經看到了兩種創建這種 happens-before 的方式:
關於哪些操做能夠創建這種 happens-before,更多的信息請參閱「java.util.concurrent 包的概要說明」。
描述了一個簡單的作法,能夠有效防止線程干擾和內存一致性錯誤。
Java 編程語言中提供了兩種基本的同步用語:同步方法(synchronized methods)和同步語句(synchronized statements)。同步語句相對而言更爲複雜一些,咱們將在下一小節中進行描述。本節重點討論同步方法。
咱們只須要在聲明方法的時候增長關鍵字 synchronized 便可:
public class SynchronizedCounter { private int c = 0; public synchronized void increment() { c++; } public synchronized void decrement() { c--; } public synchronized int value() { return c; } }
若是 count 是 SynchronizedCounter 類的實例,設置其方法爲同步方法將有兩個效果:
注意:構造函數不能是 synchronized ——在構造函數前使用 synchronized 關鍵字將致使語義錯誤。同步構造函數是沒有意義的。這是由於只有建立該對象的線程才能調用其構造函數。
警告:在建立多個線程共享的對象時,要特別當心對該對象的引用不能過早地「泄露」。例如,假定咱們想要維護一個保存類的全部實例的列表 instances。咱們可能會在構造函數中這樣寫到:
instances.add(this);
可是,其餘線程可會在該對象的構造完成以前就訪問該對象。
同步方法是一種簡單的能夠避免線程相互干擾和內存一致性錯誤的策略:若是一個對象對多個線程都是可見的,那麼全部對該對象的變量的讀寫都應該是經過同步方法完成的(一個例外就是 final 字段,他在對象建立完成後是不能被修改的,所以,在對象建立完畢後,能夠經過非同步的方法對其進行安全的讀取)。這種策略是有效的,可是可能致使「活躍度(liveness)」問題。這點咱們會在本課程的後面進行描述。
描述了一個更通用的同步方法,並介紹了同步是如何基於內部鎖的。
同步是構建在被稱爲「內部鎖(intrinsic lock)」或者是「監視鎖(monitor lock)」的內部實體上的。(在 API 中一般被稱爲是「監視器(monitor)」。)內部鎖在兩個方面都扮演着重要的角色:保證對對象狀態訪問的排他性和創建也對象可見性相關的重要的「 happens-before。
每個對象都有一個與之相關聯動的內部鎖。按照傳統的作法,當一個線程須要對一個對象的字段進行排他性訪問並保持訪問的一致性時,他必須在訪問前先獲取該對象的內部鎖,而後才能訪問之,最後釋放該內部鎖。在線程獲取對象的內部鎖到釋放對象的內部鎖的這段時間,咱們說該線程擁有該對象的內部鎖。只要有一個線程已經擁有了一個內部鎖,其餘線程就不能再擁有該鎖了。其餘線程將會在試圖獲取該鎖的時候被阻塞了。
當一個線程釋放了一個內部鎖,那麼就會創建起該動做和後續獲取該鎖之間的 happens-before 關係。
當一個線程調用一個同步方法的時候,他就自動地得到了該方法所屬對象的內部鎖,並在方法返回的時候釋放該鎖。即便是因爲出現了沒有被捕獲的異常而致使方法返回,該鎖也會被釋放。
咱們可能會感到疑惑:當調用一個靜態的同步方法的時候會怎樣了,靜態方法是和類相關的,而不是和對象相關的。在這種狀況下,線程獲取的是該類的類對象的內部鎖。這樣對於靜態字段的方法是經過一個和類的實例的鎖相區分的另外的鎖來進行的。
另一種建立同步代碼的方式就是使用同步語句。和同步方法不一樣,使用同步語句是必須指明是要使用哪一個對象的內部鎖:
public void addName(String name) { synchronized(this) { lastName = name; nameCount++; } nameList.add(name); }
在上面的示例中,方法 addName 須要對 lastName 和 nameCount 的修改進行同步,還要避免同步調用其餘對象的方法(在同步代碼段中調用其餘對象的方法可能致使「活躍度(Liveness)」中描述的問題)。若是沒有使用同步語句,那麼將不得不使用一個單獨的,未同步的方法來完成對 nameList.add 的調用。
在改善併發性時,巧妙地使用同步語句能起到很大的幫助做用。例如,咱們假定類 MsLunch 有兩個實例字段,c1 和 c2,這兩個變量毫不會一塊兒使用。全部對這兩個變量的更新都須要進行同步。可是沒有理由阻止對 c1 的更新和對 c2 的更新出現交錯——這樣作會建立沒必要要的阻塞,進而下降併發性。此時,咱們沒有使用同步方法或者使用和this 相關的鎖,而是建立了兩個單獨的對象來提供鎖。
public class MsLunch { private long c1 = 0; private long c2 = 0; private Object lock1 = new Object(); private Object lock2 = new Object(); public void inc1() { synchronized(lock1) { c1++; } } public void inc2() { synchronized(lock2) { c2++; } } }
採用這種方式時須要特別的當心。咱們必須絕對確保相關字段的訪問交錯是徹底安全的。
回憶前面提到的:線程不能獲取已經被別的線程獲取的鎖。可是線程能夠獲取自身已經擁有的鎖。容許一個線程能重複得到同一個鎖就稱爲重入同步(reentrant synchronization)。它是這樣的一種狀況:在同步代碼中直接或者間接地調用了還有同步代碼的方法,兩個同步代碼段中使用的是同一個鎖。若是沒有重入同步,在編寫同步代碼時須要額外的當心,以免線程將本身阻塞。
介紹了不會被其餘線程干擾的作法的整體思路。
在編程中,原子性動做就是指一次性有效完成的動做。原子性動做是不能在中間中止的:要麼一次性徹底執行完畢,要麼就不執行。在動做沒有執行完畢以前,是不會產生可見結果的。
經過前面的示例,咱們已經發現了諸如 c++ 這樣的自增表達式並不屬於原子操做。即便是很是簡單的表達式也包含了複雜的動做,這些動做能夠被解釋成許多別的動做。然而,的確存在一些原子操做的:
原子性動做是不會出現交錯的,所以,使用這些原子性動做時不用考慮線程間的干擾。然而,這並不意味着能夠移除對原子操做的同步。由於內存一致性錯誤仍是有可能出現的。使用 volatile 變量能夠減小內存一致性錯誤的風險,由於任何對 volatile 變 量的寫操做都和後續對該變量的讀操做創建了 happens-before 關係。這就意味着對 volatile 類型變量的修改對於別的線程來講是可見的。更重要的是,這意味着當一個線程讀取一個 volatile 類型的變量時,他看到的不只僅是對該變量的最後一次修改,還看到了致使這種修改的代碼帶來的其餘影響。
使用簡單的原子變量訪問比經過同步代碼來訪問變量更高效,可是須要程序員的更多細心考慮,以免內存一致性錯誤。這種額外的付出是否值得徹底取決於應用程序的大小和複雜度。
一個並行應用程序的及時執行能力被稱爲它的活躍度(liveness)。本節將介紹最多見的一種活躍度的問題——死鎖,以及另外兩個活躍度的問題——飢餓和活鎖。
死鎖是指兩個或兩個以上的線程永遠被阻塞,一直等待對方的資源。
下面是一個例子。
Alphonse 和 Gaston 是朋友,都頗有禮貌。禮貌的一個嚴格的規則是,當你給一個朋友鞠躬時,你必須保持鞠躬,直到你的朋友鞠躬回給你。不幸的是,這條規則有個缺陷,那就是若是兩個朋友同一時間向對方鞠躬,那就永遠不會完了。這個示例應用程序中,死鎖模型是這樣的:
public class Deadlock { static class Friend { private final String name; public Friend(String name) { this.name = name; } public String getName() { return this.name; } public synchronized void bow(Friend bower) { System.out.format("%s: %s" + " has bowed to me!%n", this.name, bower.getName()); bower.bowBack(this); } public synchronized void bowBack(Friend bower) { System.out.format("%s: %s" + " has bowed back to me!%n", this.name, bower.getName()); } } public static void main(String[] args) { final Friend alphonse = new Friend("Alphonse"); final Friend gaston = new Friend("Gaston"); new Thread(new Runnable() { public void run() { alphonse.bow(gaston); } }).start(); new Thread(new Runnable() { public void run() { gaston.bow(alphonse); } }).start(); } }
當他們嘗試調用 bowBack 兩個線程將被阻塞。不管是哪一個線程永遠不會結束,由於每一個線程都在等待對方鞠躬。這就是死鎖了。
飢餓和活鎖雖比死鎖問題稍微不常見點,但這些是在併發軟件種每個設計師仍然可能會遇到的問題。
飢餓描述了這樣一個狀況,一個線程不能得到按期訪問共享資源,因而沒法繼續執行。這種狀況通常出如今共享資源被某些「貪婪」線程佔用,而致使資源長時間不被其餘線程可用。例如,假設一個對象提供一個同步的方法,每每須要很長時間返回。若是一個線程頻繁調用該方法,其餘線程若也須要頻繁的同步訪問同一個對象一般會被阻塞。
一個線程經常處於響應另外一個線程的動做,若是其餘線程也經常處於該線程的動做,那麼就可能出現活鎖。與死鎖、活鎖的線程同樣,程序沒法進一步執行。然而,線程是不會阻塞的,他們只是會忙於應對彼此的恢復工做。現實種的例子是,兩人面對面試圖經過一條走廊: Alphonse 移動到他的左則讓路給 Gaston ,而 Gaston 移動到他的右側想讓 Alphonse 過去,兩我的同時讓路,但其實兩人都擋住了對方沒辦法過去,他們仍然彼此阻塞。
多線程之間常常須要協同工做,最多見的方式是使用 Guarded Blocks,它循環檢查一個條件(一般初始值爲 true),直到條件發生變化才跳出循環繼續執行。在使用 Guarded Blocks 時有如下幾個步驟須要注意:
假設 guardedJoy 方法必需要等待另外一線程爲共享變量 joy 設值才能繼續執行。那麼理論上能夠用一個簡單的條件循環來實現,但在等待過程當中 guardedJoy 方法不停的檢查循環條件其實是一種資源浪費。
public void guardedJoy() { // Simple loop guard. Wastes // processor time. Don't do this! while(!joy) {} System.out.println("Joy has been achieved!"); }
更加高效的保護方法是調用 Object.wait 將當前線程掛起,直到有另外一線程發起事件通知(儘管通知的事件不必定是當前線程等待的事件)。
public synchronized void guardedJoy() { // This guard only loops once for each special event, which may not // be the event we're waiting for. while(!joy) { try { wait(); } catch (InterruptedException e) {} } System.out.println("Joy and efficiency have been achieved!"); }
注意:必定要在循環裏面調用 wait 方法,不要想固然的認爲線程喚醒後循環條件必定發生了改變。
和其餘能夠暫停線程執行的方法同樣,wait 方法會拋出 InterruptedException,在上面的例子中,由於咱們關心的是 joy 的值,因此忽略了 InterruptedException。
爲何 guardedJoy 是 synchronized 的?假設 d 是用來調用 wait 的對象,當一個線程調用 d.wait,它必需要擁有 d的內部鎖(不然會拋出異常),得到 d 的內部鎖的最簡單方法是在一個 synchronized 方法裏面調用 wait。
當一個線程調用 wait 方法時,它釋放鎖並掛起。而後另外一個線程請求並得到這個鎖並調用 Object.notifyAll 通知全部等待該鎖的線程。
public synchronized notifyJoy() { joy = true; notifyAll(); }
當第二個線程釋放這個該鎖後,第一個線程再次請求該鎖,從 wait 方法返回並繼續執行。
注意:還有另一個通知方法,notify(),它只會喚醒一個線程。但因爲它並不容許指定哪個線程被喚醒,因此通常只在大規模併發應用(即系統有大量類似任務的線程)中使用。由於對於大規模併發應用,咱們其實並不關心哪個線程被喚醒。
如今咱們使用 Guarded blocks 建立一個生產者/消費者應用。這類應用須要在兩個線程之間共享數據:生產者生產數據,消費者使用數據。兩個線程經過共享對象通訊。在這裏,線程協同工做的關鍵是:生產者發佈數據以前,消費者不可以去讀取數據;消費者沒有讀取舊數據前,生產者不能發佈新數據。
在下面的例子中,數據經過 Drop 對象共享的一系列文本消息:
public class Drop { // Message sent from producer // to consumer. private String message; // True if consumer should wait // for producer to send message, // false if producer should wait for // consumer to retrieve message. private boolean empty = true; public synchronized String take() { // Wait until message is // available. while (empty) { try { wait(); } catch (InterruptedException e) {} } // Toggle status. empty = true; // Notify producer that // status has changed. notifyAll(); return message; } public synchronized void put(String message) { // Wait until message has // been retrieved. while (!empty) { try { wait(); } catch (InterruptedException e) {} } // Toggle status. empty = false; // Store message. this.message = message; // Notify consumer that status // has changed. notifyAll(); } }
Producer 是生產者線程,發送一組消息,字符串 DONE 表示全部消息都已經發送完成。爲了模擬現實狀況,生產者線程還會在消息發送時隨機的暫停。
public class Producer implements Runnable { private Drop drop; public Producer(Drop drop) { this.drop = drop; } public void run() { String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; Random random = new Random(); for (int i = 0; i < importantInfo.length; i++) { drop.put(importantInfo[i]); try { Thread.sleep(random.nextInt(5000)); } catch (InterruptedException e) { } } drop.put("DONE"); } }
Consumer 是消費者線程,讀取消息並打印出來,直到讀取到字符串 DONE 爲止。消費者線程在消息讀取時也會隨機的暫停。
public class Consumer implements Runnable { private Drop drop; public Consumer(Drop drop) { this.drop = drop; } public void run() { Random random = new Random(); for (String message = drop.take(); !message.equals("DONE"); message = drop.take()) { System.out.format("MESSAGE RECEIVED: %s%n", message); try { Thread.sleep(random.nextInt(5000)); } catch (InterruptedException e) { } } } }
ProducerConsumerExample 是主線程,它啓動生產者線程和消費者線程。
public class ProducerConsumerExample { public static void main(String[] args) { Drop drop = new Drop(); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); } }
若是一個對象它被構造後其,狀態不能改變,則這個對象被認爲是不可變的(immutable )。不可變對象的好處是能夠建立簡單的、可靠的代碼。
不可變對象在併發應用種特別有用。由於他們不能改變狀態,它們不能被線程干擾所中斷或者被其餘線程觀察到內部不一致的狀態。
程序員每每不肯使用不可變對象,由於他們擔憂建立一個新的對象要比更新對象的成本要高。實際上這種開銷經常被過度高估,並且使用不可變對象所帶來的一些效率提高也抵消了這種開銷。例如:使用不可變對象下降了垃圾回收所產生的額外開銷,也減小了用來確保使用可變對象不出現併發錯誤的一些額外代碼。
接下來看一個可變對象的類,而後轉化爲一個不可變對象的類。經過這個例子說明轉化的原則以及使用不可變對象的好處。
SynchronizedRGB 是表示顏色的類,每個對象表明一種顏色,使用三個整形數表示顏色的三基色,字符串表示顏色名稱。
public class SynchronizedRGB { // Values must be between 0 and 255. private int red; private int green; private int blue; private String name; private void check(int red, int green, int blue) { if (red < 0 || red > 255 || green < 0 || green > 255 || blue < 0 || blue > 255) { throw new IllegalArgumentException(); } } public SynchronizedRGB(int red, int green, int blue, String name) { check(red, green, blue); this.red = red; this.green = green; this.blue = blue; this.name = name; } public void set(int red, int green, int blue, String name) { check(red, green, blue); synchronized (this) { this.red = red; this.green = green; this.blue = blue; this.name = name; } } public synchronized int getRGB() { return ((red << 16) | (green << 8) | blue); } public synchronized String getName() { return name; } public synchronized void invert() { red = 255 - red; green = 255 - green; blue = 255 - blue; name = "Inverse of " + name; } }
使用 SynchronizedRGB 時須要當心,避免其處於不一致的狀態。例如一個線程執行了如下代碼:
SynchronizedRGB color = new SynchronizedRGB(0, 0, 0, "Pitch Black"); ... int myColorInt = color.getRGB(); //Statement 1 String myColorName = color.getName(); //Statement 2
若是有另一個線程在 Statement 1 以後、Statement 2 以前調用了 color.set 方法,那麼 myColorInt 的值和 myColorName 的值就會不匹配。爲了不出現這樣的結果,必需要像下面這樣把這兩條語句綁定到一塊執行:
synchronized (color) { int myColorInt = color.getRGB(); String myColorName = color.getName(); }
這種不一致的問題只可能發生在可變對象上。
如下的一些建立不可變對象的簡單策略。並不是全部不可變類都徹底遵照這些規則,不過這不是編寫這些類的程序員們粗枝大葉形成的,極可能的是他們有充分的理由確保這些對象在建立後不會被修改。但這須要很是複雜細緻的分析,並不適用於初學者。
將這一策略應用到 SynchronizedRGB 有如下幾步:
通過以上這些修改後,咱們獲得了 ImmutableRGB:
public class ImmutableRGB { // Values must be between 0 and 255. final private int red; final private int green; final private int blue; final private String name; private void check(int red, int green, int blue) { if (red < 0 || red > 255 || green < 0 || green > 255 || blue < 0 || blue > 255) { throw new IllegalArgumentException(); } } public ImmutableRGB(int red, int green, int blue, String name) { check(red, green, blue); this.red = red; this.green = green; this.blue = blue; this.name = name; } public int getRGB() { return ((red << 16) | (green << 8) | blue); } public String getName() { return name; } public ImmutableRGB invert() { return new ImmutableRGB(255 - red, 255 - green, 255 - blue, "Inverse of " + name); } }
目前爲止,以前的教程都是重點講述了最初做爲 Java 平臺一部分的低級別 API。這些API 對於很是基本的任務來講已經足夠,可是對於更高級的任務就須要更高級的 API。特別是針對充分利用了當今多處理器和多核系統的大規模併發應用程序。 本章,咱們將着眼於 Java 5.0 新增的一些高級併發特徵。大多數功能已經在新的java.util.concurrent 包中實現。Java 集合框架中也定義了新的併發數據結構。
提供了能夠簡化許多併發應用的鎖的慣用法。
同步代碼依賴於一種簡單的可重入鎖。這種鎖使用簡單,但也有諸多限制。java.util.concurrent.locks 包提供了更復雜的鎖。這裏會重點關注其最基本的接口 Lock。 Lock 對象做用很是相似同步代碼使用的內部鎖。如同內部鎖,每次只有一個線程能夠得到 Lock 對象。經過關聯 Condition 對象,Lock 對象也支持 wait/notify 機制。
Lock 對象之於隱式鎖最大的優點在於,它們有能力收回得到鎖的嘗試。若是當前鎖對象不可用,或者鎖請求超時(若是超時時間已指定),tryLock 方法會收回獲取鎖的請求。若是在鎖獲取前,另外一個線程發送了一箇中斷,lockInterruptibly 方法也會收回獲取鎖的請求。
讓咱們使用 Lock 對象來解決咱們在活躍度中見到的死鎖問題。Alphonse 和 Gaston 已經把本身訓練成能注意到朋友什麼時候要鞠躬。咱們經過要求 Friend 對象在雙方鞠躬前必須先得到鎖來模擬此次改善。下面是改善後模型的源代碼 Safelock :
public class Safelock { static class Friend { private final String name; private final Lock lock = new ReentrantLock(); public Friend(String name) { this.name = name; } public String getName() { return this.name; } public boolean impendingBow(Friend bower) { Boolean myLock = false; Boolean yourLock = false; try { myLock = lock.tryLock(); yourLock = bower.lock.tryLock(); } finally { if (!(myLock && yourLock)) { if (myLock) { lock.unlock(); } if (yourLock) { bower.lock.unlock(); } } } return myLock && yourLock; } public void bow(Friend bower) { if (impendingBow(bower)) { try { System.out.format("%s: %s has" + " bowed to me!%n", this.name, bower.getName()); bower.bowBack(this); } finally { lock.unlock(); bower.lock.unlock(); } } else { System.out.format( "%s: %s started" + " to bow to me, but saw that" + " I was already bowing to" + " him.%n", this.name, bower.getName()); } } public void bowBack(Friend bower) { System.out.format("%s: %s has" + " bowed back to me!%n", this.name, bower.getName()); } } static class BowLoop implements Runnable { private Friend bower; private Friend bowee; public BowLoop(Friend bower, Friend bowee) { this.bower = bower; this.bowee = bowee; } public void run() { Random random = new Random(); for (;;) { try { Thread.sleep(random.nextInt(10)); } catch (InterruptedException e) { } bowee.bow(bower); } } } public static void main(String[] args) { final Friend alphonse = new Friend("Alphonse"); final Friend gaston = new Friend("Gaston"); new Thread(new BowLoop(alphonse, gaston)).start(); new Thread(new BowLoop(gaston, alphonse)).start(); } }
爲加載和管理線程定義了高級 API。Executors 的實現由 java.util.concurrent 包提供,提供了適合大規模應用的線程池管理。
在以前全部的例子中,Thread 對象表示的線程和 Runnable 對象表示的線程所執行的任務之間是緊耦合的。這對於小型應用程序來講沒問題,但對於大規模併發應用來講,合理的作法是將線程的建立與管理和程序的其餘部分分離開。封裝這些功能的對象就是執行器,接下來的部分將講詳細描述執行器。
在 java.util.concurrent 中包括三個執行器接口:
一般來講,指向 executor 對象的變量應被聲明爲以上三種接口之一,而不是具體的實現類
Executor 接口只有一個 execute 方法,用來替代一般建立(啓動)線程的方法。例如:r 是一個 Runnable 對象,e 是一個 Executor 對象。可使用
e.execute(r);
代替
(new Thread(r)).start();
但 execute 方法沒有定義具體的實現方式。對於不一樣的 Executor 實現,execute 方法多是建立一個新線程並當即啓動,但更有多是使用已有的工做線程運行r,或者將 r放入到隊列中等待可用的工做線程。(咱們將在線程池一節中描述工做線程。)
ExecutorService 接口在提供了 execute 方法的同時,新加了更加通用的 submit 方法。submit 方法除了和 execute 方法同樣能夠接受 Runnable 對象做爲參數,還能夠接受 Callable 對象做爲參數。使用 Callable對象能夠能使任務返還執行的結果。經過 submit 方法返回的 Future 對象能夠讀取 Callable 任務的執行結果,或是管理 Callable 任務和 Runnable 任務的狀態。 ExecutorService 也提供了批量運行 Callable 任務的方法。最後,ExecutorService 還提供了一些關閉執行器的方法。若是須要支持即時關閉,執行器所執行的任務須要正確處理中斷。
ScheduledExecutorService 擴展 ExecutorService接口並添加了 schedule 方法。調用 schedule 方法能夠在指定的延時後執行一個Runnable 或者 Callable 任務。ScheduledExecutorService 接口還定義了按照指定時間間隔按期執行任務的 scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法。
線程池是最多見的一種執行器的實現。
在 java.util.concurrent 包中多數的執行器實現都使用了由工做線程組成的線程池,工做線程獨立於所它所執行的 Runnable 任務和 Callable 任務,而且經常使用來執行多個任務。
使用工做線程可使建立線程的開銷最小化。在大規模併發應用中,建立大量的 Thread 對象會佔用佔用大量系統內存,分配和回收這些對象會產生很大的開銷。
一種最多見的線程池是固定大小的線程池。這種線程池始終有必定數量的線程在運行,若是一個線程因爲某種緣由終止運行了,線程池會自動建立一個新的線程來代替它。須要執行的任務經過一個內部隊列提交給線程,當沒有更多的工做線程能夠用來執行任務時,隊列保存額外的任務。
使用固定大小的線程池一個很重要的好處是能夠實現優雅退化(degrade gracefully)。例如一個 Web 服務器,每個 HTTP 請求都是由一個單獨的線程來處理的,若是爲每個 HTTP 都建立一個新線程,那麼當系統的開銷超出其能力時,會忽然地對全部請求都中止響應。若是限制 Web 服務器能夠建立的線程數量,那麼它就沒必要當即處理全部收到的請求,而是在有能力處理請求時才處理。
建立一個使用線程池的執行器最簡單的方法是調用 java.util.concurrent.Executors 的 newFixedThreadPool 方法。Executors 類還提供了下列一下方法:
若是上面的方法都不知足須要,能夠嘗試 java.util.concurrent.ThreadPoolExecutor 或者java.util.concurrent.ScheduledThreadPoolExecutor。
該框架是 JDK 7 中引入的併發框架。
fork/join 框架是 ExecutorService 接口的一種具體實現,目的是爲了幫助你更好地利用多處理器帶來的好處。它是爲那些可以被遞歸地拆解成子任務的工做類型量身設計的。其目的在於可以使用全部可用的運算能力來提高你的應用的性能。
相似於 ExecutorService 接口的其餘實現,fork/join 框架會將任務分發給線程池中的工做線程。fork/join 框架的獨特之處在與它使用工做竊取(work-stealing)算法。完成本身的工做而處於空閒的工做線程可以從其餘仍然處於忙碌(busy)狀態的工做線程處竊取等待執行的任務。
fork/join 框架的核心是 ForkJoinPool 類,它是對 AbstractExecutorService 類的擴展。ForkJoinPool 實現了工做竊取算法,並能夠執行 ForkJoinTask 任務。
使用 fork/join 框架的第一步是編寫執行一部分工做的代碼。你的代碼結構看起來應該與下面所示的僞代碼相似:
if (my portion of the work is small enough) do the work directly else split my work into two pieces invoke the two pieces and wait for the results
翻譯爲中文爲:
if (當前這個任務工做量足夠小) 直接完成這個任務 else 將這個任務或這部分工做分解成兩個部分 分別觸發(invoke)這兩個子任務的執行,並等待結果
你須要將這段代碼包裹在一個 ForkJoinTask 的子類中。不過,一般狀況下會使用一種更爲具體的的類型,或者是 RecursiveTask(會返回一個結果),或者是 RecursiveAction。 當你的 ForkJoinTask 子類準備好了,建立一個表明全部須要完成工做的對象,而後將其做爲參數傳遞給一個ForkJoinPool 實例的 invoke() 方法便可。
想要了解 fork/join 框架的基本工做原理,接下來的這個例子會有所幫助。假設你想要模糊一張圖片。原始的 source 圖片由一個整數的數組表示,每一個整數表示一個像素點的顏色數值。與 source 圖片相同,模糊以後的 destination 圖片也由一個整數數組表示。 對圖片的模糊操做是經過對 source 數組中的每個像素點進行處理完成的。處理的過程是這樣的:將每一個像素點的色值取出,與周圍像素的色值(紅、黃、藍三個組成部分)放在一塊兒取平均值,獲得的結果被放入 destination 數組。由於一張圖片會由一個很大的數組來表示,這個流程會花費一段較長的時間。若是使用 fork/join 框架來實現這個模糊算法,你就可以藉助多處理器系統的並行處理能力。下面是上述算法結合 fork/join 框架的一種簡單實現:
public class ForkBlur extends RecursiveAction { private int[] mSource; private int mStart; private int mLength; private int[] mDestination; // Processing window size; should be odd. private int mBlurWidth = 15; public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } protected void computeDirectly() { int sidePixels = (mBlurWidth - 1) / 2; for (int index = mStart; index < mStart + mLength; index++) { // Calculate average. float rt = 0, gt = 0, bt = 0; for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1); int pixel = mSource[mindex]; rt += (float)((pixel & 0x00ff0000) >> 16) / mBlurWidth; gt += (float)((pixel & 0x0000ff00) >> 8) / mBlurWidth; bt += (float)((pixel & 0x000000ff) >> 0) / mBlurWidth; } // Reassemble destination pixel. int dpixel = (0xff000000 ) | (((int)rt) << 16) | (((int)gt) << 8) | (((int)bt) << 0); mDestination[index] = dpixel; } } ...
接下來你須要實現父類中的 compute() 方法,它會直接執行模糊處理,或者將當前的工做拆分紅兩個更小的任務。數組的長度能夠做爲一個簡單的閥值來判斷任務是應該直接完成仍是應該被拆分。
protected static int sThreshold = 100000; protected void compute() { if (mLength < sThreshold) { computeDirectly(); return; } int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination)); }
若是前面這個方法是在一個 RecursiveAction 的子類中,那麼設置任務在ForkJoinPool 中執行就再直觀不過了。一般會包含如下一些步驟:
建立一個表示全部須要完成工做的任務。
// source image pixels are in src // destination image pixels are in dst ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
建立將要用來執行任務的 ForkJoinPool。
ForkJoinPool pool = new ForkJoinPool();
執行任務。
pool.invoke(fb);
想要瀏覽完成的源代碼,請查看 ForkBlur示例,其中還包含一些建立 destination 圖片文件的額外代碼。
除了可以使用 fork/join 框架來實現可以在多處理系統中被並行執行的定製化算法(如前文中的 ForkBlur.java 例子),在 Java SE 中一些比較經常使用的功能點也已經使用 fork/join 框架來實現了。在 Java SE 8 中,java.util.Arrays 類的一系列parallelSort() 方法就使用了 fork/join 來實現。這些方法與 sort() 方法很相似,可是經過使用 fork/join框 架,藉助了併發來完成相關工做。在多處理器系統中,對大數組的並行排序會比串行排序更快。這些方法到底是如何運用 fork/join 框架並不在本教程的討論範圍內。想要了解更多的信息,請參見 Java API 文檔。 其餘採用了 fork/join 框架的方法還包括java.util.streams包中的一些方法,此包是做爲 Java SE 8 發行版中 Project Lambda 的一部分。想要了解更多信息,請參見 Lambda 表達式一節。
併發集合簡化了大型數據集合管理,且極大的減小了同步的需求。
java.util.concurrent 包囊括了 Java 集合框架的一些附加類。它們也最容易按照集合類所提供的接口來進行分類:
全部這些集合,經過在集合裏新增對象和訪問或移除對象的操做之間,定義一個happens-before 的關係,來幫助程序員避免內存一致性錯誤。
java.util.concurrent.atomic 包定義了對單一變量進行原子操做的類。全部的類都提供了 get 和 set 方法,可使用它們像讀寫 volatile 變量同樣讀寫原子類。就是說,同一變量上的一個 set 操做對於任意後續的 get 操做存在 happens-before 關係。原子的 compareAndSet 方法也有內存一致性特色,就像應用到整型原子變量中的簡單原子算法。
爲了看看這個包如何使用,讓咱們返回到最初用於演示線程干擾的 Counter 類:
class Counter { private int c = 0; public void increment() { c++; } public void decrement() { c--; } public int value() { return c; } }
使用同步是一種使 Counter 類變得線程安全的方法,如 SynchronizedCounter:
class SynchronizedCounter { private int c = 0; public synchronized void increment() { c++; } public synchronized void decrement() { c--; } public synchronized int value() { return c; } }
對於這個簡單的類,同步是一種可接受的解決方案。可是對於更復雜的類,咱們可能想要避免沒必要要同步所帶來的活躍度影響。將 int 替換爲 AtomicInteger 容許咱們在不進行同步的狀況下阻止線程干擾,如 AtomicCounter:
import java.util.concurrent.atomic.AtomicInteger; class AtomicCounter { private AtomicInteger c = new AtomicInteger(0); public void increment() { c.incrementAndGet(); } public void decrement() { c.decrementAndGet(); } public int value() { return c.get(); } }
併發隨機數(JDK7)提供了高效的多線程生成僞隨機數的方法。
在 JDK7 中,java.util.concurrent 包含了一個至關便利的類 ThreadLocalRandom,能夠在當應用程序指望在多個線程或 ForkJoinTasks 中使用隨機數時使用。
對於併發訪問,使用 TheadLocalRandom 代替 Math.random() 能夠減小競爭,從而得到更好的性能。
你只需調用 ThreadLocalRandom.current(), 而後調用它的其中一個方法去獲取一個隨機數便可。下面是一個例子:
int r = ThreadLocalRandom.current() .nextInt(4, 77);
本章例子的源碼,能夠在 https://github.com/waylau/essential-java 中 com.waylau.essentialjava.concurrency 包下找到。