死磕Synchronized底層實現--輕量級鎖

本文爲死磕Synchronized底層實現第三篇文章,內容爲輕量級鎖實現。java

輕量級鎖並不複雜,其中不少內容在偏向鎖一文中已說起過,與本文內容會有部分重疊c++

另外輕量級鎖的背景和基本流程在概論中已有講解。強烈建議在看過兩篇文章的基礎下閱讀本文git

本系列文章將對HotSpot的synchronized鎖實現進行全面分析,內容包括偏向鎖、輕量級鎖、重量級鎖的加鎖、解鎖、鎖升級流程的原理及源碼分析,但願給在研究synchronized路上的同窗一些幫助。主要包括如下幾篇文章:github

死磕Synchronized底層實現--概論oop

死磕Synchronized底層實現--偏向鎖源碼分析

死磕Synchronized底層實現--輕量級鎖ui

死磕Synchronized底層實現--重量級鎖spa

更多文章見我的博客:github.com/farmerjohng….net

本文分爲兩個部分:prototype

1.輕量級鎖獲取流程

2.輕量級鎖釋放流程

本人看的JVM版本是jdk8u,具體版本號以及代碼能夠在這裏看到。

輕量級鎖獲取流程

下面開始輕量級鎖獲取流程分析,代碼在bytecodeInterpreter.cpp#1816

CASE(_monitorenter): {
  oop lockee = STACK_OBJECT(-1);
  ...
  if (entry != NULL) {
   ...
   // 上面省略的代碼中若是CAS操做失敗也會調用到InterpreterRuntime::monitorenter

    // traditional lightweight locking
    if (!success) {
      // 構建一個無鎖狀態的Displaced Mark Word
      markOop displaced = lockee->mark()->set_unlocked();
      // 設置到Lock Record中去
      entry->lock()->set_displaced_header(displaced);
      bool call_vm = UseHeavyMonitors;
      if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
        // 若是CAS替換不成功,表明鎖對象不是無鎖狀態,這時候判斷下是否是鎖重入
        // Is it simple recursive case?
        if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
          entry->lock()->set_displaced_header(NULL);
        } else {
          // CAS操做失敗則調用monitorenter
          CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
        }
      }
    }
    UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
  } else {
    istate->set_msg(more_monitors);
    UPDATE_PC_AND_RETURN(0); // Re-execute
  }
}
複製代碼

若是鎖對象不是偏向模式或已經偏向其餘線程,則successfalse。這時候會構建一個無鎖狀態的mark word設置到Lock Record中去,咱們稱Lock Record中存儲對象mark word的字段叫Displaced Mark Word

若是當前鎖的狀態不是無鎖狀態,則CAS失敗。若是這是一次鎖重入,那直接將Lock RecordDisplaced Mark Word設置爲null

咱們看個demo,在該demo中重複3次得到鎖,

synchronized(obj){
    synchronized(obj){
    	synchronized(obj){
    	}
    }
}
複製代碼

假設鎖的狀態是輕量級鎖,下圖反應了mark word和線程棧中Lock Record的狀態,能夠看到右邊線程棧中包含3個指向當前鎖對象的Lock Record。其中棧中最高位的Lock Record爲第一次獲取鎖時分配的。其Displaced Mark word的值爲鎖對象的加鎖前的mark word,以後的鎖重入會在線程棧中分配一個Displaced Mark wordnullLock Record

爲何JVM選擇在線程棧中添加Displaced Mark word爲null的Lock Record來表示重入計數呢?首先鎖重入次數是必定要記錄下來的,由於每次解鎖都須要對應一次加鎖,解鎖次數等於加鎖次數時,該鎖才真正的被釋放,也就是在解鎖時須要用到說鎖重入次數的。一個簡單的方案是將鎖重入次數記錄在對象頭的mark word中,但mark word的大小是有限的,已經存放不下該信息了。另外一個方案是隻建立一個Lock Record並在其中記錄重入次數,Hotspot沒有這樣作的緣由我猜是考慮到效率有影響:每次重入得到鎖都須要遍歷該線程的棧找到對應的Lock Record,而後修改它的值。

因此最終Hotspot選擇每次得到鎖都添加一個Lock Record來表示鎖的重入。

接下來看看InterpreterRuntime::monitorenter方法

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
  ...
  Handle h_obj(thread, elem->obj());
  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
         "must be NULL or an object");
  if (UseBiasedLocking) {
    // Retry fast entry if bias is revoked to avoid unnecessary inflation
    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } else {
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
  ...
IRT_END
複製代碼

fast_enter的流程在偏向鎖一文已經分析過,若是當前是偏向模式且偏向的線程還在使用鎖,那會將鎖的mark word改成輕量級鎖的狀態,同時會將偏向的線程棧中的Lock Record修改成輕量級鎖對應的形式。代碼位置在biasedLocking.cpp#212

// 線程還存活則遍歷線程棧中全部的Lock Record
  GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
  BasicLock* highest_lock = NULL;
  for (int i = 0; i < cached_monitor_info->length(); i++) {
    MonitorInfo* mon_info = cached_monitor_info->at(i);
    // 若是能找到對應的Lock Record說明偏向的線程還在執行同步代碼塊中的代碼
    if (mon_info->owner() == obj) {
      ...
      // 須要升級爲輕量級鎖,直接修改偏向線程棧中的Lock Record。爲了處理鎖重入的case,在這裏將Lock Record的Displaced Mark Word設置爲null,第一個Lock Record會在下面的代碼中再處理
      markOop mark = markOopDesc::encode((BasicLock*) NULL);
      highest_lock = mon_info->lock();
      highest_lock->set_displaced_header(mark);
    } else {
      ...
    }
  }
  if (highest_lock != NULL) {
    // 修改第一個Lock Record爲無鎖狀態,而後將obj的mark word設置爲執行該Lock Record的指針
    highest_lock->set_displaced_header(unbiased_prototype);
    obj->release_set_mark(markOopDesc::encode(highest_lock));
    ...
  } else {
    ...
  }
複製代碼

咱們看slow_enter的流程。

void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
  markOop mark = obj->mark();
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");
  // 若是是無鎖狀態
  if (mark->is_neutral()) {
    //設置Displaced Mark Word並替換對象頭的mark word
    lock->set_displaced_header(mark);
    if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
      TEVENT (slow_enter: release stacklock) ;
      return ;
    }
  } else
  if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
    assert(lock != mark->locker(), "must not re-lock the same lock");
    assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
    // 若是是重入,則設置Displaced Mark Word爲null
    lock->set_displaced_header(NULL);
    return;
  }

  ...
  // 走到這一步說明已是存在多個線程競爭鎖了 須要膨脹爲重量級鎖
  lock->set_displaced_header(markOopDesc::unused_mark());
  ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}

複製代碼

輕量級鎖釋放流程

CASE(_monitorexit): {
  oop lockee = STACK_OBJECT(-1);
  CHECK_NULL(lockee);
  // derefing's lockee ought to provoke implicit null check
  // find our monitor slot
  BasicObjectLock* limit = istate->monitor_base();
  BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  // 從低往高遍歷棧的Lock Record
  while (most_recent != limit ) {
    // 若是Lock Record關聯的是該鎖對象
    if ((most_recent)->obj() == lockee) {
      BasicLock* lock = most_recent->lock();
      markOop header = lock->displaced_header();
      // 釋放Lock Record
      most_recent->set_obj(NULL);
      // 若是是偏向模式,僅僅釋放Lock Record就行了。不然要走輕量級鎖or重量級鎖的釋放流程
      if (!lockee->mark()->has_bias_pattern()) {
        bool call_vm = UseHeavyMonitors;
        // header!=NULL說明不是重入,則須要將Displaced Mark Word CAS到對象頭的Mark Word
        if (header != NULL || call_vm) {
          if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
            // CAS失敗或者是重量級鎖則會走到這裏,先將obj還原,而後調用monitorexit方法
            most_recent->set_obj(lockee);
            CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
          }
        }
      }
      //執行下一條命令
      UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
    }
    //處理下一條Lock Record
    most_recent++;
  }
  // Need to throw illegal monitor state exception
  CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
  ShouldNotReachHere();
}
複製代碼

輕量級鎖釋放時須要將Displaced Mark Word替換到對象頭的mark word中。若是CAS失敗或者是重量級鎖則進入到InterpreterRuntime::monitorexit方法中。

//%note monitor_1
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
 
  Handle h_obj(thread, elem->obj());
  ...
  ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
  // Free entry. This must be done here, since a pending exception might be installed on
  //釋放Lock Record
  elem->set_obj(NULL);
  ...
IRT_END
複製代碼

monitorexit調用完slow_exit方法後,就釋放Lock Record

void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
  fast_exit (object, lock, THREAD) ;
}
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
  ...
  markOop dhw = lock->displaced_header();
  markOop mark ;
  if (dhw == NULL) {
     // 重入鎖,什麼也不作
   	 ...
     return ;
  }

  mark = object->mark() ;

  // 若是是mark word==Displaced Mark Word即輕量級鎖,CAS替換對象頭的mark word
  if (mark == (markOop) lock) {
     assert (dhw->is_neutral(), "invariant") ;
     if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
        TEVENT (fast_exit: release stacklock) ;
        return;
     }
  }
  //走到這裏說明是重量級鎖或者解鎖時發生了競爭,膨脹後調用重量級鎖的exit方法。
  ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
複製代碼

該方法中先判斷是否是輕量級鎖,若是是輕量級鎖則將替換mark word,不然膨脹爲重量級鎖並調用exit方法,相關邏輯將在重量級鎖的文章中講解。

相關文章
相關標籤/搜索