提及java的線程之間的通訊,不免會想起它,他就是 wait 、notify、notifyAlljava
是這樣的, 小明作了飯,給二月鳥吃(備註:二月鳥 是一我的名),只有一雙筷子, 小明須要先嚐一口看能不能吃, 而後再通知二月鳥吃飯,二月鳥要等小明放下筷子才能拿起筷子吃飯。用程序怎麼實現,實現方式不少 咱今天只論wait notify 其餘方式靠邊兒站node
public class WaitNotifyTest { public static void main(String[] args) { // 這是一把鎖 筷子 Object obj = new Object(); new Thread(()->{ synchronized (obj){ try { System.out.println("二月鳥來了 等着吃飯..."); obj.wait(); System.out.println("二月鳥拿到筷子吃飯嘍..."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ synchronized (obj){ try{ System.out.println("小明嘗一下能夠吃,通知你們吃飯..."); // 通知二月鳥 能夠吃飯了 obj.notify(); System.out.println("這個時候小明尚未放下筷子..."); TimeUnit.SECONDS.sleep(5); System.out.println("小明放筷子了..."); } catch (Exception e) { System.out.println("中毒了.."); } } }).start(); } } 執行結果: 二月鳥來了 等着吃飯... 小明嘗一下能夠吃,通知你們吃飯... 這個時候小明尚未放下筷子... 小明放筷子了... 二月鳥拿到筷子吃飯嘍...
仍是1中的例子,小明作完飯後,二月鳥和小月月都來吃飯了,仍是隻有一雙筷子(真窮), 這時候咱們用wait notify 試一下 你們看看 面試
public class WaitNotifyTest02 { public static void main(String[] args) { // 這是一把鎖 筷子 Object obj = new Object(); new Thread(()->{ synchronized (obj){ try { System.out.println("二月鳥來了 等着吃飯..."); obj.wait(); System.out.println("二月鳥拿到筷子吃飯嘍..."); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("二月鳥吃完了 放下筷子..."); } } }).start(); new Thread(()->{ synchronized (obj){ try { System.out.println("小月月來了 等着吃飯..."); obj.wait(); System.out.println("小月月拿到筷子吃飯嘍..."); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("小月月吃完了 放下筷子..."); } } }).start(); new Thread(()->{ synchronized (obj){ try{ System.out.println("小明嘗一下能夠吃,通知你們吃飯..."); // 通知等待吃飯的一我的(注意是一我的) 能夠吃飯了 obj.notify(); System.out.println("這個時候小明尚未放下筷子..."); TimeUnit.SECONDS.sleep(5); System.out.println("小明放筷子了..."); } catch (Exception e) { System.out.println("中毒了.."); } } }).start(); } } 執行結果: 二月鳥來了 等着吃飯... 小月月來了 等着吃飯... 小明嘗一下能夠吃,通知你們吃飯... 這個時候小明尚未放下筷子... 小明放筷子了... 二月鳥拿到筷子吃飯嘍... 二月鳥吃完了 放下筷子...
咦? 小月月怎麼不吃飯, 二月鳥放下筷子了呀! 框架
其餘都不變,notify 改成notifyAll dom
new Thread(()->{ synchronized (obj){ try{ System.out.println("小明嘗一下能夠吃,通知你們吃飯..."); // 通知等待吃飯的人,全部人 能夠吃飯了 obj.notifyAll(); System.out.println("這個時候小明尚未放下筷子..."); TimeUnit.SECONDS.sleep(5); System.out.println("小明放筷子了..."); } catch (Exception e) { System.out.println("中毒了.."); } } }).start(); 執行結果: 二月鳥來了 等着吃飯... 小月月來了 等着吃飯... 小明嘗一下能夠吃,通知你們吃飯... 這個時候小明尚未放下筷子... 小明放筷子了... 小月月拿到筷子吃飯嘍... 小月月吃完了 放下筷子... 二月鳥拿到筷子吃飯嘍... 二月鳥吃完了 放下筷子...
這裏能夠看到: 小月月竟然比二月鳥先吃到飯,這裏是由於notifyAll 是喚醒了全部人,誰搶到筷子(鎖),誰先吃(執行)ide
// millis:wait超時時間 interruptible:是否可中斷 TRAPS:調用wait的線程
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread const Self = THREAD;
JavaThread jt = Self->as_Java_thread();ui
// 這個方法就是判斷當前線程是否得到了鎖,若是不在synchronized代碼塊就會拋異常
// 看 check_owner方法
// 一堆代碼
// 貌似是申請鎖
Thread::SpinAcquire(&_WaitSetLock, "WaitSet - add");
// 這個就是把wait的線程放到一個「集合」裏 看AddWaiter方法
// 貌似是釋放鎖
// 一堆代碼
```c++ // Returns true if the specified thread owns the ObjectMonitor. // 翻譯:若是指定的線程擁有ObjectMonitor 也就是得到了鎖 就返回true // Otherwise returns false and throws IllegalMonitorStateException // 翻譯:不然返回false 而且拋出異常 IllegalMonitorStateException // (IMSE). If there is a pending exception and the specified thread // is not the owner, that exception will be replaced by the IMSE. // 這句不會翻譯了 bool ObjectMonitor::check_owner(Thread* THREAD) { void* cur = owner_raw(); if (cur == THREAD) { return true; } if (THREAD->is_lock_owned((address)cur)) { set_owner_from_BasicLock(cur, THREAD); // Convert from BasicLock* to Thread*. _recursions = 0; return true; } // 這裏拋出了異常 THROW_MSG_(vmSymbols::java_lang_IllegalMonitorStateException(), "current thread is not owner", false); }
public class WaitNotifyTest04 { public static void main(String[] args) { Object obj = new Object(); new Thread(()->{ try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } 異常信息: Exception in thread "Thread-0" java.lang.IllegalMonitorStateException at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at WaitNotifyTest04.lambda$main$0(WaitNotifyTest04.java:23) at java.lang.Thread.run(Thread.java:748)
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
// put node at end of queue (circular doubly linked list)
// 翻譯: 把node放到隊列的最後邊(循環雙向鏈表)
// 若是你不知道什麼是循環雙向鏈表 我給你畫出來
// _WaitSet是頭元素 其實玩兒過鏈表的人 這裏應該都很清楚
// 這裏所謂的頭 不是真正的頭 只是一個相對概念
// 若是是空的 那就一個waiter ,_next _prev 都指向本身
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
// 若是已經有了元素 那就把node放最後 // 而後node的_next 指向_WaitSet也就是頭元素 // node的_prev指向添加以前的頭元素 ObjectWaiter* head = _WaitSet; ObjectWaiter* tail = head->_prev; assert(tail->_next == head, "invariant check"); tail->_next = node; head->_prev = node; node->_next = head; node->_prev = tail;
循環雙向鏈表: 不知道你們有沒有據說過<a>Disruptor</a>這個框架 ,他好像就是相似的結構 ,這個框架把性能作到了極致,有時間你們能夠了解下  #### 3.1.2 hotspot notify代碼(刪減版): ```c++ void ObjectMonitor::notify(TRAPS) { CHECK_OWNER(); // 檢測是否擁有鎖 // 若是沒有wait線程 直接返回 // 這也是爲何 一個線程先notify 另外一個線程再wait 是不能喚醒的 必須是先wait的線程才能被notify // if (_WaitSet == NULL) { return; } DTRACE_MONITOR_PROBE(notify, this, object(), THREAD); // 主要看這個 INotify(THREAD); OM_PERFDATA_OP(Notifications, inc(1)); }
// Consider:
// If the lock is cool (cxq == null && succ == null) and we're on an MP system
// 翻譯: 若是鎖符合條件(cxq == null && succ == null) 而且在一個MP系統
// 說實話 我翻譯不下去了 , 主要看下邊這句
// then instead of transferring a thread from the WaitSet to the EntryList
// 翻譯:咱們將從WaitSet中取一個線程到EntryList
// EntryList 裏是啥 就是喚醒的線程集合
// we might just dequeue a thread from the WaitSet and directly unpark() it.
void ObjectMonitor::INotify(Thread Self) {
Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify");
// 主要看這 DequeueWaiter就是從wiatset裏取出一個 線程
// 怎麼取?從頭取 頭是誰? 第一個wiat的線程唄 看後邊代碼
ObjectWaiter iterator = DequeueWaiter();
if (iterator != NULL) {
// 一堆代碼
// 這裏須要注意 注意 注意 //1. 若是list是空 就把list指向iterator 也就是取出來那個元素 // notify 的時候 是一個 走 if if (list == NULL) { iterator->_next = iterator->_prev = NULL; _EntryList = iterator; } else { // 若是不是空 注意了 注意了 notifyAll的時候 大多數會走這裏 iterator->TState = ObjectWaiter::TS_CXQ; for (;;) { // 這裏不是很懂 _cxq貌似是指向的_EntryList的第一個元素 ObjectWaiter * front = _cxq; iterator->_next = front; // cmpxchg : 若是&_cxq 等於 front 就把iterator寫到&_cxq內存 // 這個操做時什麼呢 把頭上拿出來的waiter放到_EntryList的首元素位置 !! // 再品一下這句話 等會兒看notifyAll的時候 會用到這個知識 if (Atomic::cmpxchg(&_cxq, front, iterator) == front) { break; } } } iterator->wait_reenter_begin(this);
```c++ inline ObjectWaiter* ObjectMonitor::DequeueWaiter() { // dequeue the very first waiter // 人家都註釋了 取出第一個waiter 這就是爲何notify是按wait順序來的 ObjectWaiter* waiter = _WaitSet; if (waiter) { // 看這個方法 ↓ DequeueSpecificWaiter(waiter); } return waiter; } inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) { // when the waiter has woken up because of interrupt, // timeout or other spurious wake-up, dequeue the // waiter from waiting list ObjectWaiter* next = node->_next; // 若是_next 等於本身 那說明就一個waiter // 閉眼想:是否是一個的時候本身的_next是指向本身的 老光棍都懂,左手拉右手。。 if (next == node) { _WaitSet = NULL;//由於就一個 拿出後把waitset置空 } else { // 這波操做 就是把頭結點 毫無痕跡的取出來 // 末尾元素_next指向 第二個元素 // 第二個元素的_prev 指向末尾元素 ObjectWaiter* prev = node->_prev; next->_prev = prev; prev->_next = next; // 把_WaitSet引用到next 也就是第二個元素上 第一個元素拜拜了您嘞 if (_WaitSet == node) { _WaitSet = next; } } // _next _prev 置空 留一個node 孤零零 給你們畫一下看一眼 node->_next = NULL; node->_prev = NULL; }
// 這裏代碼簡單 就是循環調用了INotify() INotify咱們已經看過了 就是把waiterset的元素按順序取出來
// 一個一個放到EntryList的頭部
// 看我下邊圖表示
// The current implementation of notifyAll() transfers the waiters one-at-a-time
// from the waitset to the EntryList. This could be done more efficiently with a
// single bulk transfer but in practice it's not time-critical. Beware too,
// that in prepend-mode we invert the order of the waiters. Let's say that the
// waitset is "ABCD" and the EntryList is "XYZ". After a notifyAll() in prepend
// mode the waitset will be empty and the EntryList will be "DCBAXYZ".
void ObjectMonitor::notifyAll(TRAPS) {
CHECK_OWNER(); // Throws IMSE if not owner.
if (_WaitSet == NULL) {
DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);
int tally = 0;
while (_WaitSet != NULL) {
OM_PERFDATA_OP(Notifications, inc(tally));
waiterset的連線我就少畫點兒 湊活看 : 看出什麼端倪沒有? waiter的順序 到了EntryList 變成了 倒敘 這也是爲何 我測試的時候,多個wait 在執行完notifyAll的時候 是倒着獲取到鎖的 ,仍是那句話 JVM沒有強制規定規則,因此不能以這個爲依據進行業務的編寫 只是大概瞭解一下實現原理。。 而已  ```java public class WaitNotifyTest05 { public static void main(String[] args) throws InterruptedException { Object obj = new Object(); for (int i = 0; i < 20; i++) { new Thread(()->{ synchronized (obj){ try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName()+" 獲取鎖"); } } },"線程名稱"+i).start(); } TimeUnit.SECONDS.sleep(2); new Thread(()->{ synchronized (obj){ try{ System.out.println("notifyAll "); } catch (Exception e) { System.out.println("異常了 "); } finally { obj.notifyAll(); } } }).start(); } } 輸出結果: notifyAll 線程名稱19 獲取鎖 線程名稱18 獲取鎖 線程名稱17 獲取鎖 線程名稱16 獲取鎖 線程名稱15 獲取鎖 線程名稱14 獲取鎖 線程名稱13 獲取鎖 線程名稱8 獲取鎖 線程名稱11 獲取鎖 線程名稱10 獲取鎖 線程名稱9 獲取鎖 線程名稱12 獲取鎖 線程名稱7 獲取鎖 線程名稱6 獲取鎖 線程名稱5 獲取鎖 線程名稱4 獲取鎖 線程名稱3 獲取鎖 線程名稱2 獲取鎖 線程名稱1 獲取鎖 線程名稱0 獲取鎖
public class WaitNotifyTest06 { // 存放生產數據的容器 static LinkedList<String> list= new LinkedList<>(); // 容器最大存放數 static int maxCount = 1; public static void main(String[] args) { // 生產者線程 new Thread(()->{ while (true) { synchronized (list){ // 若是滿了 wait 等待消費者消費了 通知生產者 再生產 if (list.size() == maxCount) { try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生產了元素 通知消費者 接着消費 String a = Double.valueOf(Math.random()*10000).intValue()+""; list.add(a); System.out.println("生產:"+a); list.notify(); } } },"生產者").start(); // 消費者線程 new Thread(()->{ while (true) { synchronized (list){ // 若是沒有元素 wait if (list.size() == 0) { try{ list.wait(); } catch (Exception e) { e.printStackTrace(); } } // 消費了 通知生產者接着生產 System.out.println("消費:"+list.pop()); list.notify(); } } },"消費者").start(); } }
注意: 你們看到我這裏判斷list的大小 用的是if(lsit.size() == maxCount ) 和 if(list.size() > = 0)
一個生產者和一個消費者 貌似沒什麼大問題
那若是2個消費者呢 ?
public class WaitNotifyTest07 { // 存放生產數據的容器 static LinkedList<String> list= new LinkedList<>(); // 容器最大存放數 static int maxCount = 1; public static void main(String[] args) { // 生產者線程 new Thread(()->{ while (true) { synchronized (list){ // 若是滿了 wait 等待消費者消費了 通知生產者 再生產 if (list.size() == maxCount) { try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生產了元素 通知消費者 接着消費 String a = Double.valueOf(Math.random()*10000).intValue()+""; list.add(a); System.out.println("生產:"+a); list.notify(); } } },"生產者").start(); // 消費者線程 new Thread(()->{ while (true) { synchronized (list){ // 若是沒有元素 wait if (list.size() == 0) { try{ list.wait(); } catch (Exception e) { e.printStackTrace(); } } // 消費了 通知生產者接着生產 System.out.println("消費01:"+list.pop()); list.notify(); } } },"消費者01").start(); // 消費者線程 new Thread(()->{ while (true) { synchronized (list){ // 若是沒有元素 wait if (list.size() == 0) { try{ list.wait(); } catch (Exception e) { e.printStackTrace(); } } // 消費了 通知生產者接着生產 System.out.println("消費02:"+list.pop()); list.notify(); } } },"消費者02").start(); } } 異常嘍: Exception in thread "消費者02" java.util.NoSuchElementException at java.util.LinkedList.removeFirst(LinkedList.java:270) at java.util.LinkedList.pop(LinkedList.java:801) at WaitNotifyTest07.lambda$main$2(WaitNotifyTest07.java:63) at java.lang.Thread.run(Thread.java:748)
爲何會這樣呢 ?最容易想到的一條錯誤路徑是這樣的:
public class WaitNotifyTest07 { // 存放生產數據的容器 static LinkedList<String> list= new LinkedList<>(); // 容器最大存放數 static int maxCount = 1; public static void main(String[] args) { // 生產者線程 new Thread(()->{ while (true) { synchronized (list){ // 若是滿了 wait 等待消費者消費了 通知生產者 再生產 if (list.size() == maxCount) { try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生產了元素 通知消費者 接着消費 String a = Double.valueOf(Math.random()*10000).intValue()+""; list.add(a); System.out.println("生產:"+a); list.notify(); } } },"生產者").start(); // 消費者線程 new Thread(()->{ while (true) { synchronized (list){ // 若是沒有元素 wait while (list.size() == 0) { try{ list.wait(); } catch (Exception e) { e.printStackTrace(); } } // 消費了 通知生產者接着生產 System.out.println("消費:"+list.pop()); list.notify(); } } },"消費者").start(); // 消費者線程 new Thread(()->{ while (true) { synchronized (list){ // 若是沒有元素 wait while (list.size() == 0) { try{ list.wait(); } catch (Exception e) { e.printStackTrace(); } } // 消費了 通知生產者接着生產 System.out.println("消費02:"+list.pop()); list.notify(); } } },"消費者02").start(); } }
咦? 又有問題了。 死鎖了!!
由於一個生產者,兩個消費者 須要用notifyAll 代替notify
爲何notify會死鎖 ?隨便舉例一種狀況
public class WaitNotifyTest07 { // 存放生產數據的容器 static LinkedList<String> list= new LinkedList<>(); // 容器最大存放數 static int maxCount = 1; public static void main(String[] args) { // 生產者線程 new Thread(()->{ while (true) { synchronized (list){ // 若是滿了 wait 等待消費者消費了 通知生產者 再生產 if (list.size() == maxCount) { try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生產了元素 通知消費者 接着消費 String a = Double.valueOf(Math.random()*10000).intValue()+""; list.add(a); System.out.println("生產:"+a); list.notifyAll(); } } },"生產者").start(); // 消費者線程 new Thread(()->{ while (true) { synchronized (list){ // 若是沒有元素 wait while (list.size() == 0) { try{ list.wait(); } catch (Exception e) { e.printStackTrace(); } } // 消費了 通知生產者接着生產 System.out.println("消費:"+list.pop()); list.notifyAll(); } } },"消費者").start(); // 消費者線程 new Thread(()->{ while (true) { synchronized (list){ // 若是沒有元素 wait while (list.size() == 0) { try{ list.wait(); } catch (Exception e) { e.printStackTrace(); } } // 消費了 通知生產者接着生產 System.out.println("消費02:"+list.pop()); list.notifyAll(); } } },"消費者02").start(); } }
注意: 以上文字 僅表明我的觀點,僅供參考,若有問題還請指出,當即立刻連滾帶爬的從被窩裏出來改正。