Java Object類提供了一個基於native實現的wait和notify線程間通信的方式,這是除了synchronized以外的另一塊獨立的併發基礎部分,有關wait和notify·的部份內容,咱們在上面分析monitor的exit的時候已經有一些涉及,可是並無過多的深刻,留下了很多的疑問,本小節會詳細分析。java
ObjectMonitor類中的wait函數代碼實現以下:node
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { ... if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { ... // 拋出異常,不會直接進入等待 THROW(vmSymbols::java_lang_InterruptedException()); ... } ... ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT; Self->_ParkEvent->reset(); OrderAccess::fence(); Thread::SpinAcquire(&_WaitSetLock, "WaitSet - add"); AddWaiter(&node); Thread::SpinRelease(&_WaitSetLock); if ((SyncFlags & 4) == 0) { _Responsible = NULL; } ... // exit the monitor exit(true, Self); ... if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) { // Intentionally empty } else if (node._notified == 0) { if (millis <= 0) { Self->_ParkEvent->park(); } else { ret = Self->_ParkEvent->park(millis); } } // 被 notify 喚醒以後的善後邏輯 ... }
照例只列出wait函數的核心功能部分,首先會判斷一下當前線程是否爲可中斷而且是否已經被中斷,若是是的話會直接拋出InterruptedException異常,而不會進入wait等待,不然的話,就須要執行下面的等待過程,首先會根據Self當前線程新建一個ObjectWaiter對象節點,這個對象咱們在前面分析monitor的enter的視乎就已經見過了。生成一個新的節點以後就是須要將這個節點放到等待隊列中,經過調用AddWaiter函數實現node的入隊操做,不過在入隊操做以前須要得到互斥鎖以保證併發安全:linux
void Thread::SpinAcquire(volatile int * adr, const char * LockName) { if (Atomic::cmpxchg (1, adr, 0) == 0) { return; // normal fast-path return } // Slow-path : We've encountered contention -- Spin/Yield/Block strategy. TEVENT(SpinAcquire - ctx); int ctr = 0; int Yields = 0; for (;;) { while (*adr != 0) { ++ctr; if ((ctr & 0xFFF) == 0 || !os::is_MP()) { if (Yields > 5) { os::naked_short_sleep(1); } else { os::naked_yield(); ++Yields; } } else { SpinPause(); } } if (Atomic::cmpxchg(1, adr, 0) == 0) return; } }
SpinAcquire是一個自旋鎖實現,它經過一個死循環不斷經過cas檢查判斷是否得到鎖,這裏開始會經過一個cas檢查看下是否可以成功,若是成功的話就不用進行下面比較重量級的spin過程,若是獲取失敗,就須要進入下面的spin過程,這裏的spin邏輯是一個比較有意思的算法。這裏定義了一個ctr變量,其實就是counter計數器的意思,(ctr&0xFFF)==0|| !os::is_MP()這個條件比較有意思,意思是若是我嘗試的次數大於)0xfff,或者當前系統是一個單核處理器系統,那麼就執行下面的邏輯。能夠看到這裏的spin是有必定的限度的,首先開始的時候,若是是多核系統,那麼會直接執行SpinPause,咱們看下SpinPause函數的實現,這個函數是實現CPU的忙等待,所以會有不一樣系統和CPU架構的對應實現。SpinPause函數linux平臺代碼以下:算法
int SpinPause() { return 0; }
即SpinPause函數直接返回0,是SpinAcquire實現CPU忙等待的一種方式,此外,若是SpinAcquire裏嘗試的次數已經到了0xFFF次的話,就利用另外一種方式實現等待:安全
if (Yields > 5) { os::naked_short_sleep(1); } else { os::naked_yield(); ++Yields; }
首先會嘗試經過yield函數來將當前線程的CPU執行時間讓出來,若是讓了5次仍是沒有得到鎖,那麼就只能經過naked_short_sleep來實現等待了,這裏的naked_short_sleep函數從名字就能夠看出來是短暫休眠等待,經過每次休眠等待1ms實現。咱們如今看下naked_yield的實現方式,一樣看linux平臺的實現:架構
void os::naked_yield() { sched_yield(); }
能夠看到這裏的實現是直接調用pthread的sched_yield函數實現線程的時間片讓出。接下來看linux平臺naked_short_sleep的實現:併發
void os::naked_short_sleep(jlong ms) { struct timespec req; assert(ms < 1000, "Un-interruptable sleep, short time use only"); req.tv_sec = 0; if (ms > 0) { req.tv_nsec = (ms % 1000) * 1000000; } else { req.tv_nsec = 1; } nanosleep(&req, NULL); return; }
這裏咱們經過nanosleep系統調用實現線程的timed waiting。app
到這裏咱們分析一下SpinAcquire的實現邏輯:若是是單核處理器就經過yield或者sleep實現等待,若是是多核處理器的話就經過調用空實現函數來忙等待。由於若是是單核CPU的話,你經過調用空實現函數實現忙等待是不科學的,由於只有一個核,若是經過這個核來實現忙等待,那麼本來須要釋放鎖的線程得不到執行,那就可能形成飢餓等待,咱們的CPU一直在轉動,可是沒有解決任何問題。因此若是是單核CPU系統的話,咱們不能經過調用空函數來實現等待。相反,若是是多核的話,那就能夠在另外一個空閒的CPU上實現忙等待增長系統的吞吐量,能夠看到在JVM中爲了增長系統的算力和保證系統的兼容性,作了多少努力和實現。框架
上面的SpinAcquire函數返回以後,就表示咱們得到了鎖,如今能夠將咱們的node放到等待隊列中了:函數
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) { assert(node != NULL, "should not add NULL node"); assert(node->_prev == NULL, "node already in list"); assert(node->_next == NULL, "node already in list"); // put node at end of queue (circular doubly linked list) if (_WaitSet == NULL) { _WaitSet = node; node->_prev = node; node->_next = node; } else { ObjectWaiter* head = _WaitSet; ObjectWaiter* tail = head->_prev; assert(tail->_next == head, "invariant check"); tail->_next = node; head->_prev = node; node->_next = head; node->_prev = tail; } }
這裏的實現比較簡單,就是講node插入雙向鏈表_WaitSet的尾部。插入鏈表完畢知乎,須要經過SpinRelease將鎖釋放。
將新建的node節點加入到WaitSet隊列中了,咱們接着看wait函數接下來的邏輯,如今咱們就要執行以下內容:
// exit the monitor exit(true, Self);
wait操做釋放monitor鎖就是在這裏實現的。而後接着的是wait函數的park等待。
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) { // Intentionally empty } else if (node._notified == 0) { if (millis <= 0) { Self->_ParkEvent->park(); } else { ret = Self->_ParkEvent->park(millis); } }
在正式park以前,還會再一次看下是否有interruptd,若是有的話就會跳過park操做,不然就會進行park阻塞,park阻塞的時間就是wait函數調用時傳入的時間參數。
wait函數接下來的操做是park阻塞喚醒以後的善後邏輯,對於咱們的分析不是很重要,這裏就跳過。
notify函數的實現代碼以下:
void ObjectMonitor::notify(TRAPS) { CHECK_OWNER(); if (_WaitSet == NULL) { TEVENT(Empty-Notify); return; } DTRACE_MONITOR_PROBE(notify, this, object(), THREAD); INotify(THREAD); OM_PERFDATA_OP(Notifications, inc(1)); }
這裏主要經過判斷WaitSet隊列中是否還有線程執行了wait,若是沒有就直接返回,若是有就對線程進行喚醒,喚醒經過調用INotify函數實現:
void ObjectMonitor::INotify(Thread * Self) { const int policy = Knob_MoveNotifyee; Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify"); ObjectWaiter * iterator = DequeueWaiter(); if (iterator != NULL) { ObjectWaiter * list = _EntryList; if (policy == 0) { // prepend to EntryList if (list == NULL) { ... } else { ... } } else if (policy == 1) { // append to EntryList if (list == NULL) { ... } else { ... } } else if (policy == 2) { // prepend to cxq if (list == NULL) { ... } else { ... } } else if (policy == 3) { // append to cxq ... } else { ... } ... } Thread::SpinRelease(&_WaitSetLock); }
能夠看到,這裏的操做都是在_WaitSetLock保護下的,首先會從WaitSet隊列中出隊一個節點,而後針對這個節點根據Knob_MoveNotifyee來決定執行不一樣的策略邏輯,而且策略中的邏輯框架就是同樣的,根據_EntryList是否爲空執行不一樣操做。Knob_MoveNottifyee默認值爲2。
notify的喚醒策略主要有如下幾種:
在分析不一樣策略的邏輯以前,咱們先看下WaitSet的出隊邏輯實現,這是INotify函數開始會執行的事:
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() { // dequeue the very first waiter ObjectWaiter* waiter = _WaitSet; if (waiter) { DequeueSpecificWaiter(waiter); } return waiter; }
從註釋中能夠看出,這裏將WaitSet隊列中的第一個node出隊,下面直接返回WaitSet隊列指針也就是隊頭,而後刪除出隊節點:
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) { assert(node != NULL, "should not dequeue NULL node"); assert(node->_prev != NULL, "node already removed from list"); assert(node->_next != NULL, "node already removed from list"); // when the waiter has woken up because of interrupt, // timeout or other spurious wake-up, dequeue the // waiter from waiting list ObjectWaiter* next = node->_next; if (next == node) { assert(node->_prev == node, "invariant check"); _WaitSet = NULL; } else { ObjectWaiter* prev = node->_prev; assert(prev->_next == node, "invariant check"); assert(next->_prev == node, "invariant check"); next->_prev = prev; prev->_next = next; if (_WaitSet == node) { _WaitSet = next; } } node->_next = NULL; node->_prev = NULL; }
這樣咱們就完成了從WaitSet雙向鏈表隊列中的隊頭出隊邏輯。
if (list == NULL) { iterator->_next = iterator->_prev = NULL; _EntryList = iterator; } else { list->_prev = iterator; iterator->_next = list; iterator->_prev = NULL; _EntryList = iterator; }
若是EntryList爲空的話,表示以前沒有線程被notify喚醒,已經直接將當前節點放到EntryList中便可,不然的話,就將當前節點放到EntryList的頭部。
策略1和策略0邏輯很類似,這裏只是將節點放到尾部:
if (list == NULL) { iterator->_next = iterator->_prev = NULL; _EntryList = iterator; } else { // CONSIDER: finding the tail currently requires a linear-time walk of // the EntryList. We can make tail access constant-time by converting to // a CDLL instead of using our current DLL. ObjectWaiter * tail; for (tail = list; tail->_next != NULL; tail = tail->_next) {} assert(tail != NULL && tail->_next == NULL, "invariant"); tail->_next = iterator; iterator->_prev = tail; iterator->_next = NULL; }
if (list == NULL) { iterator->_next = iterator->_prev = NULL; _EntryList = iterator; } else { iterator->TState = ObjectWaiter::TS_CXQ; for (;;) { ObjectWaiter * front = _cxq; iterator->_next = front; if (Atomic::cmpxchg(iterator, &_cxq, front) == front) { break; } } }
首先若是發現 EntryList 爲空的話,也就是第一個被 notify 喚醒的線程會進入到 EntryList,而 WaitSet 中剩下的節點會依次插入到 cxq 的頭部,而後更新 cxq 指針指向新的頭節點。
策略3的邏輯和策略2比較類似,只是策略3會將節點放到cxq尾部:
iterator->TState = ObjectWaiter::TS_CXQ; for (;;) { ObjectWaiter * tail = _cxq; if (tail == NULL) { iterator->_next = NULL; if (Atomic::replace_if_null(iterator, &_cxq)) { break; } } else { while (tail->_next != NULL) tail = tail->_next; tail->_next = iterator; iterator->_prev = tail; iterator->_next = NULL; break; } }
這裏不會判斷 EntryList 是否爲空,而是直接將節點放到 cxq 的尾部,這一點和前面幾個策略不同,須要注意下。
notifyAll 的實現其實和 notify 實現大同小異:
void ObjectMonitor::notifyAll(TRAPS) { CHECK_OWNER(); if (_WaitSet == NULL) { TEVENT(Empty-NotifyAll); return; } DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD); int tally = 0; while (_WaitSet != NULL) { tally++; INotify(THREAD); } OM_PERFDATA_OP(Notifications, inc(tally)); }
能夠看到,其實就是根據WaitSet長度,反覆調用INotify函數,至關於屢次調用 notify。