Java併發——基石篇(下)

Object wait和notify的實現機制

Java Object類提供了一個基於native實現的wait和notify線程間通信的方式,這是除了synchronized以外的另一塊獨立的併發基礎部分,有關wait和notify·的部份內容,咱們在上面分析monitor的exit的時候已經有一些涉及,可是並無過多的深刻,留下了很多的疑問,本小節會詳細分析。java

wait實現

ObjectMonitor類中的wait函數代碼實現以下:node

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
  ...
  if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
    ...
    // 拋出異常,不會直接進入等待     THROW(vmSymbols::java_lang_InterruptedException());
    ...
  }
  ...
  ObjectWaiter node(Self);
  node.TState = ObjectWaiter::TS_WAIT;
  Self->_ParkEvent->reset();
  OrderAccess::fence();

  Thread::SpinAcquire(&_WaitSetLock, "WaitSet - add");
  AddWaiter(&node);
  Thread::SpinRelease(&_WaitSetLock);

  if ((SyncFlags & 4) == 0) {
    _Responsible = NULL;
  }

  ...
  // exit the monitor   exit(true, Self); 
  ...
  if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
        // Intentionally empty       } else if (node._notified == 0) {
        if (millis <= 0) {
          Self->_ParkEvent->park();
        } else {
          ret = Self->_ParkEvent->park(millis);
        }
  }
  // 被 notify 喚醒以後的善後邏輯   ...
}

照例只列出wait函數的核心功能部分,首先會判斷一下當前線程是否爲可中斷而且是否已經被中斷,若是是的話會直接拋出InterruptedException異常,而不會進入wait等待,不然的話,就須要執行下面的等待過程,首先會根據Self當前線程新建一個ObjectWaiter對象節點,這個對象咱們在前面分析monitor的enter的視乎就已經見過了。生成一個新的節點以後就是須要將這個節點放到等待隊列中,經過調用AddWaiter函數實現node的入隊操做,不過在入隊操做以前須要得到互斥鎖以保證併發安全:linux

void Thread::SpinAcquire(volatile int * adr, const char * LockName) {
  if (Atomic::cmpxchg (1, adr, 0) == 0) {
    return;   // normal fast-path return   }

  // Slow-path : We've encountered contention -- Spin/Yield/Block strategy.   TEVENT(SpinAcquire - ctx);
  int ctr = 0;
  int Yields = 0;
  for (;;) {
    while (*adr != 0) {
      ++ctr;
      if ((ctr & 0xFFF) == 0 || !os::is_MP()) {
        if (Yields > 5) {
          os::naked_short_sleep(1);
        } else {
          os::naked_yield();
          ++Yields;
        }
      } else {
        SpinPause();
      }
    }
    if (Atomic::cmpxchg(1, adr, 0) == 0) return;
  }
}

SpinAcquire是一個自旋鎖實現,它經過一個死循環不斷經過cas檢查判斷是否得到鎖,這裏開始會經過一個cas檢查看下是否可以成功,若是成功的話就不用進行下面比較重量級的spin過程,若是獲取失敗,就須要進入下面的spin過程,這裏的spin邏輯是一個比較有意思的算法。這裏定義了一個ctr變量,其實就是counter計數器的意思,(ctr&0xFFF)==0|| !os::is_MP()這個條件比較有意思,意思是若是我嘗試的次數大於)0xfff,或者當前系統是一個單核處理器系統,那麼就執行下面的邏輯。能夠看到這裏的spin是有必定的限度的,首先開始的時候,若是是多核系統,那麼會直接執行SpinPause,咱們看下SpinPause函數的實現,這個函數是實現CPU的忙等待,所以會有不一樣系統和CPU架構的對應實現。SpinPause函數linux平臺代碼以下:算法

int SpinPause() {
    return 0;
}

即SpinPause函數直接返回0,是SpinAcquire實現CPU忙等待的一種方式,此外,若是SpinAcquire裏嘗試的次數已經到了0xFFF次的話,就利用另外一種方式實現等待:安全

if (Yields > 5) {
   os::naked_short_sleep(1);
} else {
   os::naked_yield();
   ++Yields;
}

首先會嘗試經過yield函數來將當前線程的CPU執行時間讓出來,若是讓了5次仍是沒有得到鎖,那麼就只能經過naked_short_sleep來實現等待了,這裏的naked_short_sleep函數從名字就能夠看出來是短暫休眠等待,經過每次休眠等待1ms實現。咱們如今看下naked_yield的實現方式,一樣看linux平臺的實現:架構

void os::naked_yield() {
  sched_yield();
}

能夠看到這裏的實現是直接調用pthread的sched_yield函數實現線程的時間片讓出。接下來看linux平臺naked_short_sleep的實現:併發

void os::naked_short_sleep(jlong ms) {
  struct timespec req;

  assert(ms < 1000, "Un-interruptable sleep, short time use only");
  req.tv_sec = 0;
  if (ms > 0) {
    req.tv_nsec = (ms % 1000) * 1000000;
  } else {
    req.tv_nsec = 1;
  }

  nanosleep(&req, NULL);

  return;
}

這裏咱們經過nanosleep系統調用實現線程的timed waiting。app

到這裏咱們分析一下SpinAcquire的實現邏輯:若是是單核處理器就經過yield或者sleep實現等待,若是是多核處理器的話就經過調用空實現函數來忙等待。由於若是是單核CPU的話,你經過調用空實現函數實現忙等待是不科學的,由於只有一個核,若是經過這個核來實現忙等待,那麼本來須要釋放鎖的線程得不到執行,那就可能形成飢餓等待,咱們的CPU一直在轉動,可是沒有解決任何問題。因此若是是單核CPU系統的話,咱們不能經過調用空函數來實現等待。相反,若是是多核的話,那就能夠在另外一個空閒的CPU上實現忙等待增長系統的吞吐量,能夠看到在JVM中爲了增長系統的算力和保證系統的兼容性,作了多少努力和實現。框架

上面的SpinAcquire函數返回以後,就表示咱們得到了鎖,如今能夠將咱們的node放到等待隊列中了:函數

inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not add NULL node");
  assert(node->_prev == NULL, "node already in list");
  assert(node->_next == NULL, "node already in list");
  // put node at end of queue (circular doubly linked list)   if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet;
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}

這裏的實現比較簡單,就是講node插入雙向鏈表_WaitSet的尾部。插入鏈表完畢知乎,須要經過SpinRelease將鎖釋放。

將新建的node節點加入到WaitSet隊列中了,咱們接着看wait函數接下來的邏輯,如今咱們就要執行以下內容:

// exit the monitor exit(true, Self);

wait操做釋放monitor鎖就是在這裏實現的。而後接着的是wait函數的park等待。

if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
        // Intentionally empty } else if (node._notified == 0) {
    if (millis <= 0) {
        Self->_ParkEvent->park();
    } else {
        ret = Self->_ParkEvent->park(millis);
    }
}

在正式park以前,還會再一次看下是否有interruptd,若是有的話就會跳過park操做,不然就會進行park阻塞,park阻塞的時間就是wait函數調用時傳入的時間參數。
wait函數接下來的操做是park阻塞喚醒以後的善後邏輯,對於咱們的分析不是很重要,這裏就跳過。

notify實現

notify函數的實現代碼以下:

void ObjectMonitor::notify(TRAPS) {
  CHECK_OWNER();
  if (_WaitSet == NULL) {
    TEVENT(Empty-Notify);
    return;
  }
  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
  INotify(THREAD);
  OM_PERFDATA_OP(Notifications, inc(1));
}

這裏主要經過判斷WaitSet隊列中是否還有線程執行了wait,若是沒有就直接返回,若是有就對線程進行喚醒,喚醒經過調用INotify函數實現:

void ObjectMonitor::INotify(Thread * Self) {
  const int policy = Knob_MoveNotifyee;

  Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify");
  ObjectWaiter * iterator = DequeueWaiter();
  if (iterator != NULL) {
    ObjectWaiter * list = _EntryList;
    if (policy == 0) {
      // prepend to EntryList       if (list == NULL) {
        ...
      } else {
        ...
      }
    } else if (policy == 1) {
      // append to EntryList       if (list == NULL) {
        ...
      } else {
        ...
      }
    } else if (policy == 2) {
      // prepend to cxq       if (list == NULL) {
        ...
      } else {
        ...
      }
    } else if (policy == 3) {
      // append to cxq       ...
    } else {
      ...
    }
    ...
  }
  Thread::SpinRelease(&_WaitSetLock);
}

能夠看到,這裏的操做都是在_WaitSetLock保護下的,首先會從WaitSet隊列中出隊一個節點,而後針對這個節點根據Knob_MoveNotifyee來決定執行不一樣的策略邏輯,而且策略中的邏輯框架就是同樣的,根據_EntryList是否爲空執行不一樣操做。Knob_MoveNottifyee默認值爲2。
notify的喚醒策略主要有如下幾種:

  1. 策略 0:將須要喚醒的 node 放到 EntryList 的頭部
  2. 策略 1:將須要喚醒的 node 放到 EntryList 的尾部
  3. 策略 2:將須要喚醒的 node 放到 CXQ 的頭部
  4. 策略 3:將須要喚醒的 node 放到 CXQ 的尾部

在分析不一樣策略的邏輯以前,咱們先看下WaitSet的出隊邏輯實現,這是INotify函數開始會執行的事:

inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
  // dequeue the very first waiter   ObjectWaiter* waiter = _WaitSet;
  if (waiter) {
    DequeueSpecificWaiter(waiter);
  }
  return waiter;
}

從註釋中能夠看出,這裏將WaitSet隊列中的第一個node出隊,下面直接返回WaitSet隊列指針也就是隊頭,而後刪除出隊節點:

inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev != NULL, "node already removed from list");
  assert(node->_next != NULL, "node already removed from list");
  // when the waiter has woken up because of interrupt,   // timeout or other spurious wake-up, dequeue the   // waiter from waiting list   ObjectWaiter* next = node->_next;
  if (next == node) {
    assert(node->_prev == node, "invariant check");
    _WaitSet = NULL;
  } else {
    ObjectWaiter* prev = node->_prev;
    assert(prev->_next == node, "invariant check");
    assert(next->_prev == node, "invariant check");
    next->_prev = prev;
    prev->_next = next;
    if (_WaitSet == node) {
      _WaitSet = next;
    }
  }
  node->_next = NULL;
  node->_prev = NULL;
}

這樣咱們就完成了從WaitSet雙向鏈表隊列中的隊頭出隊邏輯。

喚醒策略0

if (list == NULL) {
    iterator->_next = iterator->_prev = NULL;
    _EntryList = iterator;
} else {
    list->_prev = iterator;
    iterator->_next = list;
    iterator->_prev = NULL;
    _EntryList = iterator;
}

若是EntryList爲空的話,表示以前沒有線程被notify喚醒,已經直接將當前節點放到EntryList中便可,不然的話,就將當前節點放到EntryList的頭部。

喚醒策略1

策略1和策略0邏輯很類似,這裏只是將節點放到尾部:

if (list == NULL) {
        iterator->_next = iterator->_prev = NULL;
        _EntryList = iterator;
} else {
        // CONSIDER:  finding the tail currently requires a linear-time walk of         // the EntryList.  We can make tail access constant-time by converting to         // a CDLL instead of using our current DLL.         ObjectWaiter * tail;
        for (tail = list; tail->_next != NULL; tail = tail->_next) {}
        assert(tail != NULL && tail->_next == NULL, "invariant");
        tail->_next = iterator;
        iterator->_prev = tail;
        iterator->_next = NULL;
}

喚醒策略2

if (list == NULL) {
        iterator->_next = iterator->_prev = NULL;
        _EntryList = iterator;
} else {
        iterator->TState = ObjectWaiter::TS_CXQ;
        for (;;) {
          ObjectWaiter * front = _cxq;
          iterator->_next = front;
          if (Atomic::cmpxchg(iterator, &_cxq, front) == front) {
            break;
          }
        }
}

首先若是發現 EntryList 爲空的話,也就是第一個被 notify 喚醒的線程會進入到 EntryList,而 WaitSet 中剩下的節點會依次插入到 cxq 的頭部,而後更新 cxq 指針指向新的頭節點。

喚醒策略 3

策略3的邏輯和策略2比較類似,只是策略3會將節點放到cxq尾部:

iterator->TState = ObjectWaiter::TS_CXQ;
      for (;;) {
        ObjectWaiter * tail = _cxq;
        if (tail == NULL) {
          iterator->_next = NULL;
          if (Atomic::replace_if_null(iterator, &_cxq)) {
            break;
          }
        } else {
          while (tail->_next != NULL) tail = tail->_next;
          tail->_next = iterator;
          iterator->_prev = tail;
          iterator->_next = NULL;
          break;
        }
}

這裏不會判斷 EntryList 是否爲空,而是直接將節點放到 cxq 的尾部,這一點和前面幾個策略不同,須要注意下。

notifyAll 實現

notifyAll 的實現其實和 notify 實現大同小異:

void ObjectMonitor::notifyAll(TRAPS) {
  CHECK_OWNER();
  if (_WaitSet == NULL) {
    TEVENT(Empty-NotifyAll);
    return;
  }

  DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);
  int tally = 0;
  while (_WaitSet != NULL) {
    tally++;
    INotify(THREAD);
  }

  OM_PERFDATA_OP(Notifications, inc(tally));
}

能夠看到,其實就是根據WaitSet長度,反覆調用INotify函數,至關於屢次調用 notify。

相關文章
相關標籤/搜索