上一篇文章重點嘮叨了java中協調線程間通訊的wait/notify
機制,它有力的保證了線程間通訊的安全性以及便利性。本篇將介紹wait/notify
機制的一個應用以及更多線程間通訊的內容。java
目光從廁所轉到飯館,一個飯館裏一般都有好多廚師以及好多服務員,這裏咱們把廚師稱爲生產者
,把服務員稱爲消費者
,廚師和服務員是不直接打交道的,而是在廚師作好菜以後放到窗口,服務員從窗口直接把菜端走給客人就行了,這樣會極大的提高工做效率,由於省去了生產者和消費者之間的溝通成本。從java的角度看這個事情,每個廚師就至關於一個生產者線程
,每個服務員都至關於一個消費者線程
,而放菜的窗口就至關於一個緩衝隊列
,生產者線程不斷把生產好的東西放到緩衝隊列裏,消費者線程不斷從緩衝隊列裏取東西
,畫個圖就像是這樣:程序員
現實中放菜的窗口能放的菜數量是有限的,咱們假設這個窗口只能放5個菜。那麼廚師在作完菜以後須要看一下窗口是否是滿了,若是窗口已經滿了的話,就在一旁抽根菸等待
,直到有服務員來取菜的時候通知
一下廚師窗口有了空閒,能夠放菜了,這時廚師再把本身作的菜放到窗口上去炒下一個菜。從服務員的角度來講,若是窗口是空的,那麼也去一旁抽根菸等待
,直到有廚師把菜作好了放到窗口上,而且通知
他們一下,而後再把菜端走。數組
咱們先用java抽象一下菜:安全
public class Food { private static int counter = 0; private int i; //表明生產的第幾個菜 public Food() { i = ++counter; } @Override public String toString() { return "第" + i + "個菜"; } }
每次建立Food
對象,字段i
的值都會加1,表明這是建立的第幾道菜。多線程
爲了故事的順利進行,咱們首先定義一個工具類:併發
class SleepUtil { private static Random random = new Random(); public static void randomSleep() { try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
SleepUtil
的靜態方法randomSleep
表明當前線程隨機休眠一秒內的時間。dom
而後咱們再用java定義一下廚師:ide
public class Cook extends Thread { private Queue<Food> queue; public Cook(Queue<Food> queue, String name) { super(name); this.queue = queue; } @Override public void run() { while (true) { SleepUtil.randomSleep(); //模擬廚師炒菜時間 Food food = new Food(); System.out.println(getName() + " 生產了" + food); synchronized (queue) { while (queue.size() > 4) { try { System.out.println("隊列元素超過5個,爲:" + queue.size() + " " + getName() + "抽根菸等待中"); queue.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } queue.add(food); queue.notifyAll(); } } } }
咱們說每個廚師Cook
都是一個線程,內部維護了一個名叫queue
的隊列。在run
方法中是一個死循環,表明不斷的生產Food
。他每生產一個Food
後,都要判斷queue隊
列中元素的個數是否是大於4,若是大於4的話,就調用queue.wait()
等待,若是不大於4的話,就把建立號的Food
對象放到queue
隊列中,因爲可能多個線程同時訪問queue
的各個方法,因此對這段代碼用queue
對象來加鎖保護。當向隊列添加完剛建立的Food
對象以後,就能夠通知queue
這個鎖對象關聯的等待隊列中的服務員線程們能夠繼續端菜了。工具
而後咱們再用java定義一下服務員:this
class Waiter extends Thread { private Queue<Food> queue; public Waiter(Queue<Food> queue, String name) { super(name); this.queue = queue; } @Override public void run() { while (true) { Food food; synchronized (queue) { while (queue.size() < 1) { try { System.out.println("隊列元素個數爲: " + queue.size() + "," + getName() + "抽根菸等待中"); queue.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } food = queue.remove(); System.out.println(getName() + " 獲取到:" + food); queue.notifyAll(); } SleepUtil.randomSleep(); //模擬服務員端菜時間 } } }
每一個服務員也是一個線程,和廚師同樣,都在內部維護了一個名叫queue
的隊列。在run
方法中是一個死循環,表明不斷的從隊列中取走Food
。每次在從queue
隊列中取Food
對象的時候,都須要判斷一下隊列中的元素是否小於1,若是小於1的話,就調用queue.wait()
等待,若是不小於1的話,也就是隊列裏有元素,就從隊列裏取走一個Food
對象,而且通知與queue
這個鎖對象關聯的等待隊列中的廚師線程們能夠繼續向隊列裏放入Food
對象了。
在廚師和服務員線程類都定義好了以後,咱們再建立一個Restaurant
類,來看看在餐館裏真實發生的事情:
public class Restaurant { public static void main(String[] args) { Queue<Food> queue = new LinkedList<>(); new Cook(queue, "1號廚師").start(); new Cook(queue, "2號廚師").start(); new Cook(queue, "3號廚師").start(); new Waiter(queue, "1號服務員").start(); new Waiter(queue, "2號服務員").start(); new Waiter(queue, "3號服務員").start(); } }
咱們在Restaurant
中安排了3個廚師和3個服務員,你們執行一下這個程序,會發如今若是廚師生產的過快,廚師就會等待,若是服務員端菜速度過快,服務員就會等待。可是整個過程廚師和服務員是沒有任何關係的,它們是經過隊列queue
實現了所謂的解耦。
這個過程雖然不是很複雜,可是使用中仍是須要注意一些問題:
queue
。使用同一個鎖是由於對queue
的操做只能用同一個鎖來保護,假設使用不一樣的鎖,廚師線程調用queue.add
方法,服務員線程調用queue.remove
方法,這兩個方法都不是原子操做,多線程併發執行的時候會出現不可預測的結果,因此咱們使用同一個鎖來保護對queue
這個變量的操做,這一點咱們在嘮叨設計線程安全類的時候已經強調過了。
queue
的後果就是廚師線程和服務員線程使用的是同一個等待隊列。可是同一時刻廚師線程和服務員線程不會同時在等待隊列中
,由於當廚師線程在wait
的時候,隊列裏的元素確定是5,此時服務員線程確定是不會wait
的,可是消費的過程是被鎖對象queue
保護的,因此在一個服務員線程消費了一個Food
以後,就會調用notifyAll
來喚醒等待隊列中的廚師線程們;當消費者線程在wait
的時候,隊列裏的元素確定是0,此時廚師線程確定是不會wait
的,生產的過程是被鎖對象queue
保護的,因此在一個廚師線程
生產了一個Food
對象以後,就會調用notifyAll
來喚醒等待隊列中的服務員線程們
。因此同一時刻廚師線程
和服務員線程
不會同時在等待隊列中。
SleepUtil.randomSleep()
;。咱們這裏的生產者-消費者模型是把實際使用的場景進行了簡化,真正的實際場景中生產過程和消費過程通常都會很耗時,這些耗時的操做最好不要放在同步代碼塊中,這樣會形成別的線程的長時間阻塞。若是把生產過程和消費過程都放在同步代碼塊中,也就是說在一個廚師炒菜的同時不容許別的廚師炒菜,在一個服務員端菜的同時不容許別的程序員端菜,這個顯然是不合理的,你們須要注意這一點。
以上就是wait/notify
機制的一個現實應用:生產者-消費者
模式的一個簡介。
還記得在嘮叨I/O
的時候提到的管道流麼,這些管道流就是用於在不一樣線程之間的數據傳輸,一共有四種管道流:
PipedInputStream
:管道輸入字節流PipedOutputStream
:管道輸出字節流PipedReader
:管道輸入字符流PipedWriter
:管道輸出字符流字節流和字符流的用法是差很少的,咱們下邊以字節流爲例來嘮叨一下管道流的用法。
一個線程能夠持有一個PipedInputStream
對象,這個PipedInputStream
對象在內部維護了一個字節數組,默認大小爲1024字節。它並不能單獨使用,須要與另外一個線程持有的一個PipedOutputStream
創建關聯,PipedOutputStream
往該字節數組中寫數據,PipedInputStream
從該字節數組中讀數據,從而實現兩個線程的通訊。
PipedInputStream
先看一下它的幾個構造方法:
它有一個特別重要的方法就是:
PipedOutputStream
看一下它的構造方法:
它也有一個鏈接到管道輸入流的方法:
使用示例
管道流的一般使用場景就是一個線程持有一個PipedInputStream
對象,另外一個線程持有一個PipedOutputStream
對象,而後把這兩個輸入輸出管道流經過connect
方法創建鏈接,此後從管道輸出流寫入的數據就能夠經過管道輸入流讀出,從而實現了兩個線程間的數據交換,也就是實現了線程間的通訊
:
public class PipedDemo { public static void main(String[] args){ PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(); try { in.connect(out); //將輸入流和輸出流創建關聯 } catch (IOException e) { throw new RuntimeException(e); } new ReadThread(in).start(); new WriteThread(out).start(); } } class ReadThread extends Thread { private PipedInputStream in; public ReadThread(PipedInputStream in) { this.in = in; } @Override public void run() { int i = 0; try { while ((i=in.read()) != -1) { //從輸入流讀取數據 System.out.println(i); } } catch (IOException e) { throw new RuntimeException(e); } finally { try { in.close(); } catch (IOException e) { throw new RuntimeException(e); } } } } class WriteThread extends Thread { private PipedOutputStream out; public WriteThread(PipedOutputStream out) { this.out = out; } @Override public void run() { byte[] bytes = {1, 2, 3, 4, 5}; try { out.write(bytes); //向輸出流寫入數據 out.flush(); } catch (IOException e) { throw new RuntimeException(e); } finally { try { out.close(); } catch (IOException e) { throw new RuntimeException(e); } } } }
執行結果是:
1 2 3 4 5
咱們前邊說過這個方法,好比有代碼是這樣:
public static void main(String[] args) { Thread t = new Thread(new Runnable() { @Override public void run() { // ... 線程t執行的具體任務 } }, "t"); t.start(); t.join(); System.out.println("t線程執行完了,繼續執行main線程"); }
在main
線程中調用t.join()
,表明main線程
須要等待t線程
執行完成後才能繼續執行。也就是說,這個join方法
能夠協調各個線程之間的執行順序。它的實現其實很簡單:
public final synchronized void join() throws InterruptedException { while (isAlive()) { wait(); } }
須要注意的是,join方法
是Thread類的成員方法
。上邊例子中在main線程
中調用t.join()
的意思就是,使用Thread
對象t
做爲鎖對象
,若是t線程
還活着,就調用wait()
,把main線程
放到與t對象
關聯的等待隊列裏,直到t線程
執行結束,系統會主動調用一下t.notifyAll()
,把與t
對象關聯的等待隊列
中的線程所有移出,從而main線程
能夠繼續執行~
固然它還有兩個指定等待時間的重載方法:
java爲了方便的管理線程,對底層的操做系統的線程狀態作了一些抽象封裝,定義了以下的線程狀態:
須要注意的是:
運行/就緒
狀態,java語言中統一用RUNNABLE
狀態來表示。阻塞
狀態,java語言中用BLOCKED
、WAITING
和TIME_WAITING
這三個狀態分別表示。阻塞
狀態進行了進一步細分。對於由於獲取不到鎖而產生的阻塞稱爲BLOCKED
狀態,由於調用wait
或者join
方法而產生的阻塞稱爲WAITING
狀態,由於調用有超時時間的wait
、join
或者sleep
方法而產生的在有限時間內阻塞稱爲TIME_WAITING
狀態。你們能夠經過這個圖來詳細的看一下各個狀態之間的轉換過程:
java這麼劃分線程的狀態純屬於方便本身的管理,好比它會給在WAITING
和TIMED_WAITING
狀態的線程分別創建不一樣的隊列,來方便實施不一樣的恢復策略~因此你們也不用糾結爲啥和操做系統中定義的不同,其實操做系統中對各個狀態的線程仍然有各類細分來方便管理,若是是你去設計一個語言或者一個操做系統,你也能夠爲了本身的方便來定義一下線程的各類狀態。咱們做爲語言的使用者,首先仍是把這些狀態記住了再說哈🙄~
獲取線程執行狀態
java中定義了一個State
枚舉類型,來表示線程的狀態:
public class Thread implements Runnable { // ... 爲節省篇幅,省略其它方法和字段 public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; } }
而後又在Thread
類裏定義了一個成員方法:
public State getState() { //省略了具體的實現 }
咱們能夠經過這個getState
方法來獲取到對應的線程狀態,下邊來舉個例子獲取上邊列舉的6種狀態,爲了故事的順利發展,咱們先定義一個工具類:
public class LockUtil { public static void sleep(long mill) { try { Thread.sleep(mill); } catch (InterruptedException e) { throw new RuntimeException(e); } } public static void wait(Object obj) { try { obj.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
由於每次調用sleep
和wait
操做的時候都會有InterruptedException
的異常說明,咱們都須要try...catch
一下,會致使代碼結構會很混亂,因此咱們寫了個工具類來把InterruptedException
的異常轉爲運行時異常。注意,咱們這裏轉爲運行時異常只是爲了代碼結構清晰,真實狀況須要認真處理InterruptedException
的異常說明,具體怎麼使用咱們後邊會詳細嘮叨。
而後接着寫具體的獲取狀態的代碼:
public class ThreadStateDemo { private static Object lock = new Object(); //鎖對象 public static void main(String[] args) { Thread t = new Thread(new Runnable() { @Override public void run() { double d = 0.1; int i = 0; while (i++ < 100000) { //模仿一個耗時操做 d = d*0.3d; } SleepUtil.sleep(2000L); //休眠2秒鐘 synchronized (lock) { LockUtil.wait(lock); } synchronized (lock) { //嘗試獲取lock鎖 } } }, "t"); System.out.println("初始狀態:" + t.getState()); t.start(); System.out.println("運行一個耗時操做時的狀態:" + t.getState()); SleepUtil.sleep(1000L); System.out.println("休眠時的狀態:" + t.getState()); SleepUtil.sleep(2000L); System.out.println("wait的狀態:" + t.getState()); synchronized (lock) { lock.notifyAll(); } System.out.println("被notify後的狀態:" + t.getState()); synchronized (lock) { SleepUtil.sleep(1000L); //調用sleep方法不會釋放鎖 System.out.println("由於獲取鎖而阻塞的狀態:" + t.getState()); } } }
咱們在程序裏用了一系列的sleep
方法來控制程序的執行順序,這只是爲了簡單的說明線程的各個狀態的產生緣由,在真實環境中是不容許使用sleep方法來控制線程間的執行順序的,應該使用同步或者咱們上邊介紹的一系列線程通訊的方式。
這個程序的執行結果是:
初始狀態:NEW 運行一個耗時操做時的狀態:RUNNABLE 休眠時的狀態:TIMED_WAITING wait的狀態:WAITING 被notify後的狀態:BLOCKED 由於獲取鎖而阻塞的狀態:TERMINATED 線程的各個狀態都獲取到了哈。
wait/notify
機制的生產者-消費者模式很重要,務必認真看幾遍~PipedInputStream
對象,這個PipedInputStream
對象在內部維護了一個字節數組,默認大小爲1024字節。它並不能單獨使用,須要與另外一個線程持有的一個PipedOutputStream
創建關聯,PipedOutputStream
往該字節數組中寫數據,PipedInputStream
從該字節數組中讀數據,從而實現兩個線程的通訊。join
方法能夠實現一個線程在另外一個線程執行完畢後才繼續執行的功能。NEW
、RUNNABLE
、BLOCKED
、WAITING
、TIME_WAITING
、TERMINATED
這些線程狀態,與操做系統中的線程有一些區別:運行/就緒
狀態,java語言中統一用RUNNABLE
狀態來表示。阻塞
狀態,java語言中用BLOCKED
、WAITING
和TIME_WAITING
這三個狀態分別表示。寫文章挺累的,有時候你以爲閱讀挺流暢的,那實際上是背後無數次修改的結果。若是你以爲不錯請幫忙轉發一下,萬分感謝~