Java併發——基石篇(中)

Synchronized實現機制

synchronized是Java併發同步開發的基本技術,是Java語言層面提供的線程間同步手段。咱們編寫以下一段代碼:java

public class SyncTest {
    private static final Object lock = new Object();

    public static void main(String[] args) {
        int a = 0;
        synchronized (lock) {
            a++;
        }
        System.out.println("Result: " + a);
    }
}

針對其中同步部分咱們會看到以下字節碼:node

monitorenter
iinc 1 by 1
aload_2
monitorexit

這實際上是javac在編譯時將synchronized同步塊的先後插入montor進入和退出的字節碼指令,所以,咱們想探索synchronized的實現機制,就須要探索monitorenter和monitorexit指令的執行過程。算法

咱們先看一下monitorenter的代碼實現:數據結構

void TemplateTable::monitorenter() {
    ...
      // store object     __ movptr(Address(rmon, BasicObjectLock::obj_offset_in_bytes()), rax);
    // 跳轉執行 lock_object 函數     __ lock_object(rmon);
    ...
    }

這裏咱們依然只給出重點代碼部分,代碼比較長,前面有不少指令時初始化執行環境的,最後重點會跳轉lock_object函數,一樣這個函數也是有不一樣CPU平臺實現的,咱們仍是看X86平臺的:併發

// Lock object // // Args: //      rdx, c_rarg1: BasicObjectLock to be used for locking // // Kills: //      rax, rbx void InterpreterMacroAssembler::lock_object(Register lock_reg) {
    if (UseHeavyMonitors) {
        call_VM(noreg,
            CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter),
            lock_reg);
    } else {
        // 執行鎖優化的邏輯部分,例如:鎖粗化,鎖消除等等         // 若是一切優化措施都執行了,仍是須要進入 monitor,就執行以下,其實和上面那個 if 分支是同樣的         // Call the runtime routine for slow case         call_VM(noreg,
            CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter),
            lock_reg);
    }

}

這裏咱們不管如何最終都是執行InterpreterRuntime::monitorenter函數,這個函數不只僅是模板執行器會調用,解釋執行器也會執行這個,因此定義在InterpreterRuntime類下:函數

// Synchronization // // The interpreter's synchronization code is factored out so that it can // be shared by method invocation and synchronized blocks. //%note synchronization_3 //%note monitor_1 IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
  Handle h_obj(thread, elem->obj());
  if (UseBiasedLocking) {
    // Retry fast entry if bias is revoked to avoid unnecessary inflation     ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } else {
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
IRT_END

上面的代碼,在原始的代碼基礎上有刪減,保留了核心關鍵邏輯。即根據UseBiasedLocking這個變量分別執行fast_enter或者slow_enter的邏輯。高併發

同步鎖優化處理

同步鎖優化處理即fast_enter執行處理,下面是fast_enter函數的定義:工具

//  Fast Monitor Enter/Exit // This the fast monitor enter. The interpreter and compiler use // some assembly copies of this code. Make sure update those code // if the following function is changed. The implementation is // extremely sensitive to race condition. Be careful. void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock,
                                    bool attempt_rebias, TRAPS) {
  if (UseBiasedLocking) {
    if (!SafepointSynchronize::is_at_safepoint()) {
      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
        return;
      }
    } else {
      assert(!attempt_rebias, "can not rebias toward VM thread");
      BiasedLocking::revoke_at_safepoint(obj);
    }
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
  }

  slow_enter(obj, lock, THREAD);
}

這裏開始仍是要判斷UseBiasedLocking,若是是true的話,就針對開始執行優化邏輯,不然仍是會fall back到slow_enter的,是否是感受判斷UseBiasedLocking有點囉嗦?其實不是的,由於這個函數在不少地方都會調用,所以判斷是必須的。爲了方便接下來的代碼分析,下面我要放出OpenJDK官方wiki中針對鎖優化的原理圖:
imageoop

在解釋原理圖以前,須要介紹一下Java對象的內存佈局,由於上面圖中的實現原理就是充分利用java對象的頭完成的。Java對象在內存的結構基本分爲:對象頭和對象體,其中對象頭存儲對象特徵信息,對象體存放對象數據部分。
在OpenJDK工程中,有一個子工程叫jol,全名:java object layout,簡單易懂,就是java對象佈局的意思。這是一個工具庫,經過這個庫能夠獲取JVM中對象佈局信息,下面咱們看一下一個簡單的例子(這也是官方給的例子):佈局

public class JOLTest {
    public static void main(String[] args) {
        System.out.println(VM.current().details());
        System.out.println(ClassLayout.parseClass(A.class).toPrintable());
    }

    public static class A {
        boolean f;
    }
}

這裏經過JOL的接口來獲取類A的對象內存佈局,執行以後輸出以下內容:

# Running 64-bit HotSpot VM. # Using compressed oop with 3-bit shift. # Using compressed klass with 3-bit shift. # WARNING | Compressed references base/shifts are guessed by the experiment! # WARNING | Therefore, computed addresses are just guesses, and ARE NOT RELIABLE. # WARNING | Make sure to attach Serviceability Agent to get the reliable addresses. # Objects are 8 bytes aligned. # Field sizes by type: 4, 1, 1, 2, 2, 4, 4, 8, 8 [bytes] # Array element sizes: 4, 1, 1, 2, 2, 4, 4, 8, 8 [bytes] 
JOLTest$A object internals:
 OFFSET  SIZE      TYPE DESCRIPTION                               VALUE
      0    12           (object header)                           N/A
     12     1   boolean A.f                                       N/A
     13     3           (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 3 bytes external = 3 bytes total

這裏咱們看到輸出了不少信息,上面咱們類A的對象佈局以下:12byte的對象頭+1byte的對象體+3byte的填充部分。

從JVM的代碼咱們能夠看到一個對象的頭部定義:

volatile markOop _mark;
  union _metadata {
    Klass*      _klass;
    narrowKlass _compressed_klass;
  } _metadata;

能夠看到分爲兩部分:第一部分就是mark部分,官方稱之爲mark word,第二個是klass的類型指針,指向這個對象的類對象。這裏的mark word長度是一個系統字寬,在64bit系統上就是8個字節,從上面的日誌咱們能夠看到虛擬機默認使用了compressed klass,所以第二部分的union就是narrowKlass類型的,若是咱們繼續看下narrowKlass的定義就知道這是個32bit的unsigned int類型,所以將佔用4個字節,因此對象的頭部長度總體爲12字節。

Mark word用於存儲對象自身運行時的數據,如hash code、GC分代年齡等等信息,他是實現偏向鎖的關鍵。並且考慮到虛擬機的空間效率,mark word被設計成一個非固定數據結構以便在極小的空間內存存儲儘可能多的信息,他會根據對象的狀態複用本身的存儲空間。所以,mark word內存佈局定義在32bit和64bit系統中對象的佈局不一樣:

//  32 bits:
//  --------
//             hash:25 ------------>| age:4    biased_lock:1 lock:2 (normal object)
//             JavaThread*:23 epoch:2 age:4    biased_lock:1 lock:2 (biased object)
//             size:32 ------------------------------------------>| (CMS free block)
//             PromotedObject*:29 ---------->| promo_bits:3 ----->| (CMS promoted object)
//
//  64 bits:
//  --------
//  unused:25 hash:31 -->| unused:1   age:4    biased_lock:1 lock:2 (normal object)
//  JavaThread*:54 epoch:2 unused:1   age:4    biased_lock:1 lock:2 (biased object)
//  PromotedObject*:61 --------------------->| promo_bits:3 ----->| (CMS promoted object)
//  size:64 ----------------------------------------------------->| (CMS free block)
//
//  unused:25 hash:31 -->| cms_free:1 age:4    biased_lock:1 lock:2 (COOPs && normal object)
//  JavaThread*:54 epoch:2 cms_free:1 age:4    biased_lock:1 lock:2 (COOPs && biased object)
//  narrowOop:32 unused:24 cms_free:1 unused:4 promo_bits:3 ----->| (COOPs && CMS promoted object)
//  unused:21 size:35 -->| cms_free:1 unused:7 ------------------>| (COOPs && CMS free block)

咱們主要關注其中的normal object和biased object兩部分的頭定義

biased_lock lock 狀態
1 01 可偏置、但未鎖且未偏置
0 01 已解鎖、不可偏置
-- 00 輕量級鎖定
-- 01 重量級鎖定

偏置鎖即這個鎖首先假設本身被偏向的線程所持有。在單個線程連續持有鎖時,偏向鎖就起做用了。若是一個線程接二連三地獲取鎖,那麼獲取的過程當中若是沒有發生競態,那麼能夠跳過繁重的同步過程,直接就得到鎖執行,這樣能夠大大提升性能。偏向鎖是JDK1.6中引入的一項鎖優化手段,它的目的就是消除數據在無爭用的狀況下的同步操做,進一步提升運行性能。這裏也涉及了輕量級鎖,輕量級鎖也是JDK1.6引入的一個鎖優化機制,所謂輕量級是相對於使用操做系統互斥機制來實現傳統鎖而言的,在這個角度上,傳統的方式及時重量級鎖,悲觀鎖,會致使線程的狀態切換,而線程狀態的切換是一個至關重量級的操做。

inflate成爲重鎖

先看一下slow_enter函數:

// Interpreter/Compiler Slow Case // This routine is used to handle interpreter/compiler slow case // We don't need to use fast path here, because it must have been // failed in the interpreter/compiler code. void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
  markOop mark = obj->mark();
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");

  if (mark->is_neutral()) {
    // Anticipate successful CAS -- the ST of the displaced mark must     // be visible <= the ST performed by the CAS.     lock->set_displaced_header(mark);
    if (mark == obj()->cas_set_mark((markOop) lock, mark)) {
      TEVENT(slow_enter: release stacklock);
      return;
    }
    // Fall through to inflate() ...   } else if (mark->has_locker() &&
             THREAD->is_lock_owned((address)mark->locker())) {
    assert(lock != mark->locker(), "must not re-lock the same lock");
    assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
    lock->set_displaced_header(NULL);
    return;
  }

  // The object header will never be displaced to this lock,   // so it does not matter what the value is, except that it   // must be non-zero to avoid looking like a re-entrant lock,   // and must not look locked either.   lock->set_displaced_header(markOopDesc::unused_mark());
  ObjectSynchronizer::inflate(THREAD,
                              obj(),
                              inflate_cause_monitor_enter)->enter(THREAD);
}

這裏的執行邏輯比較簡潔,主要執行上面OpenJDK wiki中的鎖優化邏輯。首先會判斷對象鎖是否爲中立的(neutral):

bool is_neutral()  const { 
  // 這裏的 biased_lock_mask_in_place 是 7   // unlocked_value 值是 1   return (mask_bits(value(), biased_lock_mask_in_place) == unlocked_value); 
}

它的判斷標準是將mark word中最後7個bit進行掩碼運算,將獲得的值和1進行比較,若是等於1就表示對象時中立的,也就是沒有被任何線程鎖定,不然就算失敗。至於爲何是最後7個bit,是由於不管是普通對象仍是可偏置的對象,最後7個bit的格式是固定的(其餘幾種模式的對象格式不一樣)。
再回到上面的slow_enter函數,若是判斷爲中立的,也就是沒有鎖定的話,會將當前的mark word,存儲到lock指針指向的對象中,這裏的lock指針指向的就是上面提到的lock record。而後進行一個很是重要的操做,就是經過院子cas操做將這個lock指針安裝到對象mark word中,若是安裝成功就表示當前線程得到了這個對象鎖,能夠直接返回執行同步代碼塊,不然就會fall back到膨脹鎖中。

上面是判斷對象是否爲中立的邏輯,若是當線程進來發現當前的對象鎖已經被另外一個線程鎖定了。這個時候就會執行到else邏輯中:

if (mark->has_locker() &&
             THREAD->is_lock_owned((address)mark->locker())) {
    assert(lock != mark->locker(), "must not re-lock the same lock");
    assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
    lock->set_displaced_header(NULL);
    return;
}

若是發現當前對象已經鎖定,須要判斷下是否是當前線程本身鎖定了,由於在sysnchronized中可能再一次synchronized,這種狀況下直接返回便可。

若是上面的兩個判斷都失敗了,也就是對象被鎖定,而且鎖定線程不是當前線程,這個時候須要執行上面OpenJDK wiki中的inflate膨脹邏輯。所謂膨脹,就是根據當前鎖對象,生成一個ObjectMonitor對象,這個對象中保存了sychronized阻塞的隊列,以及實現了不一樣的隊列調度策略,下面咱們重點看一下ObjectMonitor中的enter邏輯

ObjectMonitor enter

在enter函數中,有不少判斷和優化執行的邏輯,可是核心和經過Enterl函數實際進入隊列將當前線程阻塞:

void ObjectMonitor::EnterI(TRAPS) {
  ...
  // Try the lock - TATAS   if (TryLock (Self) > 0) {
    assert(_succ != Self, "invariant");
    assert(_owner == Self, "invariant");
    assert(_Responsible != Self, "invariant");
    return;
  }
  ...
  // We try one round of spinning *before* enqueueing Self.   //   // If the _owner is ready but OFFPROC we could use a YieldTo()   // operation to donate the remainder of this thread's quantum   // to the owner.  This has subtle but beneficial affinity   // effects. 
  if (TrySpin (Self) > 0) {
    assert(_owner == Self, "invariant");
    assert(_succ != Self, "invariant");
    assert(_Responsible != Self, "invariant");
    return;
  }
  ...
  ObjectWaiter node(Self);
  // Push "Self" onto the front of the _cxq.   // Once on cxq/EntryList, Self stays on-queue until it acquires the lock.   // Note that spinning tends to reduce the rate at which threads   // enqueue and dequeue on EntryList|cxq.   ObjectWaiter * nxt;
  for (;;) {
    node._next = nxt = _cxq;
    if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;

    // Interference - the CAS failed because _cxq changed.  Just retry.     // As an optional optimization we retry the lock.     if (TryLock (Self) > 0) {
      assert(_succ != Self, "invariant");
      assert(_owner == Self, "invariant");
      assert(_Responsible != Self, "invariant");
      return;
    }
  }
  ...
  for (;;) {
    if (TryLock(Self) > 0) break;
    ...
    if ((SyncFlags & 2) && _Responsible == NULL) {
      Atomic::replace_if_null(Self, &_Responsible);
    }
    // park self     if (_Responsible == Self || (SyncFlags & 1)) {
      TEVENT(Inflated enter - park TIMED);
      Self->_ParkEvent->park((jlong) recheckInterval);
      // Increase the recheckInterval, but clamp the value.       recheckInterval *= 8;
      if (recheckInterval > MAX_RECHECK_INTERVAL) {
        recheckInterval = MAX_RECHECK_INTERVAL;
      }
    } else {
      TEVENT(Inflated enter - park UNTIMED);
      Self->_ParkEvent->park();
    }

    if (TryLock(Self) > 0) break;
    ...
  }
  ...
  if (_Responsible == Self) {
    _Responsible = NULL;
  }
  // 善後處理,好比將當前線程從等待隊列 CXQ 中移除   ...
}

照例只保留了重要代碼。咱們先看TryLock方法:

int ObjectMonitor::TryLock(Thread * Self) {
  void * own = _owner;
  if (own != NULL) return 0;
  if (Atomic::replace_if_null(Self, &_owner)) {
    // Either guarantee _recursions == 0 or set _recursions = 0.     assert(_recursions == 0, "invariant");
    assert(_owner == Self, "invariant");
    return 1;
  }
  // The lock had been free momentarily, but we lost the race to the lock.   // Interference -- the CAS failed.   // We can either return -1 or retry.   // Retry doesn't make as much sense because the lock was just acquired.   return -1;
}

這裏邏輯很簡單,主要是嘗試經過cas操做將_owner字段設置爲Self,其中_owner表示當前ObjectMonitor對象鎖持有的線程指針,Self指向當前執行的線程。若是設置上了,表示當前線程得到了鎖,不然沒有得到。

在上面的Enterl函數中,咱們看到TryLock先後連續執行了兩次,並且代碼判斷邏輯同樣,爲何要這樣?這實際上是爲了在入隊阻塞線程以前的最後檢查,防止線程無謂的進行狀態切換。可是爲何執行兩次?其實第二次執行的註釋已經說明了,這麼作有一些微妙的親和力影響,即若是在過去一段時間內,某個線程嘗試獲取某個資源一直失敗,那麼系統在後面會傾向於將資源分配給這個線程。

若是兩次TryLock以後仍然失敗,那麼只能乖乖入隊阻塞了,在入隊以前須要建立一個ObjectWaiter對象,這個對象將當前線程的對象(注意是JavaThread對象)包裹起來,咱們看一下ObjectWaiter的定義:

class ObjectWaiter : public StackObj {
 public:
  enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ };
  enum Sorted  { PREPEND, APPEND, SORTED };
  ObjectWaiter * volatile _next;
  ObjectWaiter * volatile _prev;
  Thread*       _thread;
  jlong         _notifier_tid;
  ParkEvent *   _event;
  volatile int  _notified;
  volatile TStates TState;
  Sorted        _Sorted;           // List placement disposition   bool          _active;           // Contention monitoring is enabled  public:
  ObjectWaiter(Thread* thread);

  void wait_reenter_begin(ObjectMonitor *mon);
  void wait_reenter_end(ObjectMonitor *mon);
};

_next和_prev表明這是一個雙向隊列實現等待隊列(可是實際上,入隊操做並無造成雙向鏈表,真正造成雙向鏈表是在exit的時候)。node節點建立完畢以後會執行以下入隊操做

// Push "Self" onto the front of the _cxq.   // Once on cxq/EntryList, Self stays on-queue until it acquires the lock.   // Note that spinning tends to reduce the rate at which threads   // enqueue and dequeue on EntryList|cxq.   ObjectWaiter * nxt;
  for (;;) {
    node._next = nxt = _cxq;
    if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;

    // Interference - the CAS failed because _cxq changed.  Just retry.     // As an optional optimization we retry the lock.     if (TryLock (Self) > 0) {
      assert(_succ != Self, "invariant");
      assert(_owner == Self, "invariant");
      assert(_Responsible != Self, "invariant");
      return;
    }
  }

註釋中說明,咱們是要將當前節點放到CXQ隊列的頭部,將節點的next指針經過cas操做指向_cxq指針就完成了入隊操做。若是入隊成功,則退出當前循環,不然再次嘗試lock,由於在高併發狀態下,cas鎖定可能會出錯失敗。

若是上面的循環退出了,就表示當前線程的node節點已經順利進入CXQ隊列了,那麼接下來須要進入另外一個循環:

for (;;) {
    if (TryLock(Self) > 0) break;
    ...
    if ((SyncFlags & 2) && _Responsible == NULL) {
      Atomic::replace_if_null(Self, &_Responsible);
    }
    // park self     if (_Responsible == Self || (SyncFlags & 1)) {
      TEVENT(Inflated enter - park TIMED);
      Self->_ParkEvent->park((jlong) recheckInterval);
      // Increase the recheckInterval, but clamp the value.       recheckInterval *= 8;
      if (recheckInterval > MAX_RECHECK_INTERVAL) {
        recheckInterval = MAX_RECHECK_INTERVAL;
      }
    } else {
      TEVENT(Inflated enter - park UNTIMED);
      Self->_ParkEvent->park();
    }

    if (TryLock(Self) > 0) break;
    ...
}

這個循環的邏輯比較簡單:

  1. 嘗試獲取鎖
  2. park當前鎖
  3. 再次嘗試獲取鎖

重點在於第2步,咱們知道synchronzed若是獲取對象鎖失敗的話,會致使當前線程被阻塞,那麼這個阻塞操做就是在這裏完成的,這裏須要注意的是,這裏須要判斷一下_Responible指針,若是這個指針爲null,表示以前對象鎖尚未等待線程,也就是說當前線程是第一個等待線程,這時候經過cas操做將_Responsible指向Self,表示當前線程是這個對象鎖的等待線程。接下來,若是當前線程是等待線程,那麼會執行一個簡單的退避算法,進行一個短期的阻塞等待。這個算法很簡單,第一次等待1ms,第二次等待8ms,第三次等待64ms,以此類推,知道等待時長的上限:MAX_RECHECK_INTERVAL,也就是說在synchronize在一個對象鎖上的線程,若是他是第一個等待線程的話,那麼他會不停的休眠,檢查鎖。反之,若是當前線程不是第一個等待線程,那麼只能執行無限期的休眠,一直等待對象鎖的exit函數執行喚醒才行。

ObjectMonitor exit

當一個線程得到對象鎖成功後,就能夠執行自定義的同步代碼塊了。執行完成以後會執行到ObjectMonitor的exit函數中,釋放當前對象鎖,方便下一個線程來獲取這個對象鎖,下面咱們逐步分析exit的實現過程。

void ObjectMonitor::exit(bool not_suspended, TRAPS) {
  for (;;) {
    ...
        ObjectWaiter * w = NULL;
    int QMode = Knob_QMode;

    if (QMode == 2 && _cxq != NULL) {
        ...
    }

    if (QMode == 3 && _cxq != NULL) {
      ...
    }

    if (QMode == 4 && _cxq != NULL) {
      ...
    }
    ...
    ExitEpilog(Self, w);
    return;
  }
}

exit函數的執行邏輯有兩步:

  1. 根據Knob_QMode的值和_cxq是否爲空執行不一樣策略
  2. 根據必定策略喚醒等待隊列的下一個線程

出隊策略0——默認策略

在exit函數中首先是根據Knob_QMode的值執行不一樣執行不一樣邏輯,而Knob_QMode的默認值爲0,它的做用主要用來指定在exit的時候EntryList和CXQ隊列之間的喚醒關係,也就是說,當EntryList和CXQ中都有等待的線程時,由於exit以後只能有一個線程獲得鎖,這個時候選擇喚醒哪一個隊列中的線程是一個值得考慮的事。而這裏的默認策略就是0。
出隊策略0表明CXQ隊列後進先出,即將cxq指針賦予_EntryList,而後經過一個循環將本來單項鍊表的CXQ鏈表變成雙向鏈表,方便後面針對CXQ鏈表進行查詢,這時候,_EntryList就是CXQ。而後交由ExitEpilog喚醒

void ObjectMonitor::ExitEpilog(Thread * Self, ObjectWaiter * Wakee) {
  assert(_owner == Self, "invariant");

  // Exit protocol:   // 1. ST _succ = wakee   // 2. membar #loadstore|#storestore;   // 2. ST _owner = NULL   // 3. unpark(wakee) 
  _succ = Knob_SuccEnabled ? Wakee->_thread : NULL;
  ParkEvent * Trigger = Wakee->_event;

  // Hygiene -- once we've set _owner = NULL we can't safely dereference Wakee again.   // The thread associated with Wakee may have grabbed the lock and "Wakee" may be   // out-of-scope (non-extant).   Wakee  = NULL;

  // Drop the lock   OrderAccess::release_store(&_owner, (void*)NULL);
  OrderAccess::fence();                               // ST _owner vs LD in unpark() 
  if (SafepointMechanism::poll(Self)) {
    TEVENT(unpark before SAFEPOINT);
  }

  DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
  Trigger->unpark();

  // Maintain stats and report events to JVMTI   OM_PERFDATA_OP(Parks, inc());
}

即經過park event將等待的線程喚醒,而後執行unpark函數

void os::PlatformEvent::unpark() {
  if (Atomic::xchg(1, &_event) >= 0) return;

  int status = pthread_mutex_lock(_mutex);
  int anyWaiters = _nParked;
  status = pthread_mutex_unlock(_mutex);

  if (anyWaiters != 0) {
    status = pthread_cond_signal(_cond);
    assert_status(status == 0, status, "cond_signal");
  }
}

這裏依然是經過pthread的condition signal喚醒線程,前面線程休眠是經過condition wait實現的。

出隊策略1

出隊策略1即Knob_QMnode的值修改成1,這種模式下是先進先出,即FIFO隊列行爲。這種模式下的處理是先將CXQ隊列reverse一下,而後再講新的隊頭也就是原來的隊尾賦值給_EntryList。而後按_EntryList進行喚醒。

出隊策略2

出隊策略2跟出隊策略0類似,可是他是優先執行CXQ隊列的操做,再執行_EntryList隊列的操做。即優先按CXQ進行喚醒。

出隊策略3和出隊策略4

出隊策略3和出隊策略4都是簡單的連接。出隊策略3是將CXQ放在_EntryList以後,而出隊策略4是將_EntryList放在CXQ以前。而後按新~~~~_EntryList進行喚醒。

相關文章
相關標籤/搜索