面試題深刻解析:Synchronized底層實現

本文爲synchronized系列第二篇。主要內容爲分析偏向鎖的實現。html

偏向鎖的誕生背景和基本原理在上文中已經講過了,強烈建議在有看過上篇文章的基礎下閱讀本文java

 

本文將分爲幾塊內容:git

1.偏向鎖的入口github

2.偏向鎖的獲取流程安全

3.偏向鎖的撤銷流程多線程

4.偏向鎖的釋放流程併發

5.偏向鎖的批量重偏向和批量撤銷app

本文分析的JVM版本是JVM8,具體版本號以及代碼能夠在這裏看到。jvm

 

偏向鎖入口

目前網上的不少文章,關於偏向鎖源碼入口都找錯地方了,致使我以前對於偏向鎖的不少邏輯一直想不通,走了不少彎路。ide

synchronized分爲synchronized代碼塊和synchronized方法,其底層獲取鎖的邏輯都是同樣的,本文講解的是synchronized代碼塊的實現。上篇文章也說過,synchronized代碼塊是由monitorentermonitorexit兩個指令實現的。

關於HotSpot虛擬機中獲取鎖的入口,網上不少文章要麼給出的方法入口爲interpreterRuntime.cpp#monitorenter,要麼給出的入口爲bytecodeInterpreter.cpp#1816。包括佔小狼的這篇文章關於鎖入口的位置說法也是有問題的(固然文章仍是很好的,在我剛開始研究synchronized的時候,小狼哥的這篇文章給了我不少幫助)。

要找鎖的入口,確定是要在源碼中找到對monitorenter指令解析的地方。在HotSpot的中有兩處地方對monitorenter指令進行解析:一個是在bytecodeInterpreter.cpp#1816 ,另外一個是在templateTable_x86_64.cpp#3667

前者是JVM中的字節碼解釋器(bytecodeInterpreter),用C++實現了每條JVM指令(如monitorenterinvokevirtual等),其優勢是實現相對簡單且容易理解,缺點是執行慢。後者是模板解釋器(templateInterpreter),其對每一個指令都寫了一段對應的彙編代碼,啓動時將每一個指令與對應彙編代碼入口綁定,能夠說是效率作到了極致。模板解釋器的實現能夠看這篇文章在研究的過程當中也請教過文章做者‘汪先生’一些問題,這裏感謝一下。

在HotSpot中,只用到了模板解釋器,字節碼解釋器根本就沒用到,R大的讀書筆記中說的很清楚了,你們能夠看看,這裏再也不贅述。

因此montorenter的解析入口在模板解釋器中,其代碼位於templateTable_x86_64.cpp#3667。經過調用路徑:templateTable_x86_64#monitorenter->interp_masm_x86_64#lock_object進入到偏向鎖入口macroAssembler_x86#biased_locking_enter,在這裏你們能夠看到會生成對應的彙編代碼。須要注意的是,不是說每次解析monitorenter指令都會調用biased_locking_enter,而是隻會在JVM啓動的時候調用該方法生成彙編代碼,以後對指令的解析是經過直接執行彙編代碼。

其實bytecodeInterpreter的邏輯和templateInterpreter的邏輯是大同小異的,由於templateInterpreter中都是彙編代碼,比較晦澀,因此看bytecodeInterpreter的實現會便於理解一點。但這裏有個坑,在jdk8u以前,bytecodeInterpreter並無實現偏向鎖的邏輯。我以前看的JDK8-87ee5ee27509這個版本就沒有實現偏向鎖的邏輯,致使我看了好久都沒看懂。在這個commit中對bytecodeInterpreter加入了偏向鎖的支持,我大體了看了下和templateInterpreter對比除了棧結構不一樣外,其餘邏輯大體相同,因此下文就按bytecodeInterpreter中的代碼對偏向鎖邏輯進行講解templateInterpreter的彙編代碼講解能夠看這篇文章,其實彙編源碼中都有英文註釋,瞭解了彙編幾個基本指令的做用再結合註釋理解起來也不是很難。

 

偏向鎖獲取流程

下面開始偏向鎖獲取流程分析,代碼在bytecodeInterpreter.cpp#1816注意本文代碼都有所刪減

CASE(_monitorenter): {
  // lockee 就是鎖對象
  oop lockee = STACK_OBJECT(-1);
  // derefing's lockee ought to provoke implicit null check
  CHECK_NULL(lockee);
  // code 1:找到一個空閒的Lock Record
  BasicObjectLock* limit = istate->monitor_base();
  BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  BasicObjectLock* entry = NULL;
  while (most_recent != limit ) {
    if (most_recent->obj() == NULL) entry = most_recent;
    else if (most_recent->obj() == lockee) break;
    most_recent++;
  }
  //entry不爲null,表明還有空閒的Lock Record
  if (entry != NULL) {
    // code 2:將Lock Record的obj指針指向鎖對象
    entry->set_obj(lockee);
    int success = false;
    uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
	// markoop即對象頭的mark word
    markOop mark = lockee->mark();
    intptr_t hash = (intptr_t) markOopDesc::no_hash;
    // code 3:若是鎖對象的mark word的狀態是偏向模式
    if (mark->has_bias_pattern()) {
      uintptr_t thread_ident;
      uintptr_t anticipated_bias_locking_value;
      thread_ident = (uintptr_t)istate->thread();
     // code 4:這裏有幾步操做,下文分析
      anticipated_bias_locking_value =
        (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
        ~((uintptr_t) markOopDesc::age_mask_in_place);
	 // code 5:若是偏向的線程是本身且epoch等於class的epoch
      if  (anticipated_bias_locking_value == 0) {
        // already biased towards this thread, nothing to do
        if (PrintBiasedLockingStatistics) {
          (* BiasedLocking::biased_lock_entry_count_addr())++;
        }
        success = true;
      }
       // code 6:若是偏向模式關閉,則嘗試撤銷偏向鎖
      else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
        markOop header = lockee->klass()->prototype_header();
        if (hash != markOopDesc::no_hash) {
          header = header->copy_set_hash(hash);
        }
        // 利用CAS操做將mark word替換爲class中的mark word
        if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
          if (PrintBiasedLockingStatistics)
            (*BiasedLocking::revoked_lock_entry_count_addr())++;
        }
      }
         // code 7:若是epoch不等於class中的epoch,則嘗試重偏向
      else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
        // 構造一個偏向當前線程的mark word
        markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
        if (hash != markOopDesc::no_hash) {
          new_header = new_header->copy_set_hash(hash);
        }
        // CAS替換對象頭的mark word  
        if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
          if (PrintBiasedLockingStatistics)
            (* BiasedLocking::rebiased_lock_entry_count_addr())++;
        }
        else {
          // 重偏向失敗,表明存在多線程競爭,則調用monitorenter方法進行鎖升級
          CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
        }
        success = true;
      }
      else {
         // 走到這裏說明當前要麼偏向別的線程,要麼是匿名偏向(即沒有偏向任何線程)
       	// code 8:下面構建一個匿名偏向的mark word,嘗試用CAS指令替換掉鎖對象的mark word
        markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |(uintptr_t)markOopDesc::age_mask_in_place |epoch_mask_in_place));
        if (hash != markOopDesc::no_hash) {
          header = header->copy_set_hash(hash);
        }
        markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
        // debugging hint
        DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
        if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
           // CAS修改爲功
          if (PrintBiasedLockingStatistics)
            (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
        }
        else {
          // 若是修改失敗說明存在多線程競爭,因此進入monitorenter方法
          CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
        }
        success = true;
      }
    }

    // 若是偏向線程不是當前線程或沒有開啓偏向模式等緣由都會致使success==false
    if (!success) {
      // 輕量級鎖的邏輯
      //code 9: 構造一個無鎖狀態的Displaced Mark Word,並將Lock Record的lock指向它
      markOop displaced = lockee->mark()->set_unlocked();
      entry->lock()->set_displaced_header(displaced);
      //若是指定了-XX:+UseHeavyMonitors,則call_vm=true,表明禁用偏向鎖和輕量級鎖
      bool call_vm = UseHeavyMonitors;
      // 利用CAS將對象頭的mark word替換爲指向Lock Record的指針
      if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
        // 判斷是否是鎖重入
        if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {		//code 10: 若是是鎖重入,則直接將Displaced Mark Word設置爲null
          entry->lock()->set_displaced_header(NULL);
        } else {
          CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
        }
      }
    }
    UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
  } else {
    // lock record不夠,從新執行
    istate->set_msg(more_monitors);
    UPDATE_PC_AND_RETURN(0); // Re-execute
  }
}

再回顧下對象頭中mark word的格式:《死磕Synchronized底層實現--偏向鎖》

JVM中的每一個類也有一個相似mark word的prototype_header,用來標記該class的epoch和偏向開關等信息。上面的代碼中lockee->klass()->prototype_header()即獲取class的prototype_header。

code 1,從當前線程的棧中找到一個空閒的Lock Record即代碼中的BasicObjectLock,下文都用Lock Record代指),判斷Lock Record是否空閒的依據是其obj字段 是否爲null。注意這裏是按內存地址從低往高找到最後一個可用的Lock Record,換而言之,就是找到內存地址最高的可用Lock Record

 

code 2,獲取到Lock Record後,首先要作的就是爲其obj字段賦值。

 

code 3,判斷鎖對象的mark word是不是偏向模式,即低3位是否爲101。

 

code 4,這裏有幾步位運算的操做 anticipated_bias_locking_value = (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &  ~((uintptr_t) markOopDesc::age_mask_in_place); 這個位運算能夠分爲3個部分。

 

第一部分((uintptr_t)lockee->klass()->prototype_header() | thread_ident) 將當前線程id和類的prototype_header相或,這樣獲得的值爲(當前線程id + prototype_header中的(epoch + 分代年齡 + 偏向鎖標誌 + 鎖標誌位)),注意prototype_header的分代年齡那4個字節爲0

 

第二部分 ^ (uintptr_t)mark 將上面計算獲得的結果與鎖對象的markOop進行異或,相等的位所有被置爲0,只剩下不相等的位。

 

第三部分 & ~((uintptr_t) markOopDesc::age_mask_in_place) markOopDesc::age_mask_in_place爲…0001111000,取反後,變成了…1110000111,除了分代年齡那4位,其餘位全爲1;將取反後的結果再與上面的結果相與,將上面異或獲得的結果中分代年齡給忽略掉。

 

code 5anticipated_bias_locking_value==0表明偏向的線程是當前線程且mark word的epoch等於class的epoch,這種狀況下什麼都不用作。

 

code 6(anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0表明class的prototype_header或對象的mark word中偏向模式是關閉的,又由於能走到這已經經過了mark->has_bias_pattern()判斷,即對象的mark word中偏向模式是開啓的,那也就是說class的prototype_header不是偏向模式。

 

而後利用CAS指令Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark撤銷偏向鎖,咱們知道CAS會有幾個參數,1是預期的原值,2是預期修改後的值 ,3是要修改的對象,與之對應,cmpxchg_ptr方法第一個參數是預期修改後的值,第2個參數是修改的對象,第3個參數是預期原值,方法返回實際原值,若是等於預期原值則說明修改爲功。

 

code 7,若是epoch已過時,則須要重偏向,利用CAS指令將鎖對象的mark word替換爲一個偏向當前線程且epoch爲類的epoch的新的mark word

 

code 8,CAS將偏向線程改成當前線程,若是當前是匿名偏向則能修改爲功,不然進入鎖升級的邏輯。

 

code 9,這一步已是輕量級鎖的邏輯了。從上圖的mark word的格式能夠看到,輕量級鎖中mark word存的是指向Lock Record的指針。這裏構造一個無鎖狀態的mark word,而後存儲到Lock RecordLock Record的格式能夠看第一篇文章)。設置mark word是無鎖狀態的緣由是:輕量級鎖解鎖時是將對象頭的mark word設置爲Lock Record中的Displaced Mark Word,因此建立時設置爲無鎖狀態,解鎖時直接用CAS替換就行了。

 

code 10, 若是是鎖重入,則將Lock RecordDisplaced Mark Word設置爲null,起到一個鎖重入計數的做用。

 

以上是偏向鎖加鎖的流程(包括部分輕量級鎖的加鎖流程),若是當前鎖已偏向其餘線程||epoch值過時||偏向模式關閉||獲取偏向鎖的過程當中存在併發衝突,都會進入到InterpreterRuntime::monitorenter方法, 在該方法中會對偏向鎖撤銷和升級。

 

偏向鎖的撤銷

這裏說的撤銷是指在獲取偏向鎖的過程由於不知足條件致使要將鎖對象改成非偏向鎖狀態;釋放是指退出同步塊時的過程,釋放鎖的邏輯會在下一小節闡述。請讀者注意本文中撤銷與釋放的區別

若是獲取偏向鎖失敗會進入到InterpreterRuntime::monitorenter方法

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
  ...
  Handle h_obj(thread, elem->obj());
  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
         "must be NULL or an object");
  if (UseBiasedLocking) {
    // Retry fast entry if bias is revoked to avoid unnecessary inflation
    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } else {
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
  ...
IRT_END

能夠看到若是開啓了JVM偏向鎖,那會進入到ObjectSynchronizer::fast_enter方法中。

void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
 if (UseBiasedLocking) {
    if (!SafepointSynchronize::is_at_safepoint()) {
      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
        return;
      }
    } else {
      assert(!attempt_rebias, "can not rebias toward VM thread");
      BiasedLocking::revoke_at_safepoint(obj);
    }
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
 }

 slow_enter (obj, lock, THREAD) ;
}

若是是正常的Java線程,會走上面的邏輯進入到BiasedLocking::revoke_and_rebias方法,若是是VM線程則會走到下面的BiasedLocking::revoke_at_safepoint。咱們主要看BiasedLocking::revoke_and_rebias方法。這個方法的主要做用像它的方法名:撤銷或者重偏向,第一個參數封裝了鎖對象和當前線程,第二個參數表明是否容許重偏向,這裏是true。

BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
  assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
    
  markOop mark = obj->mark();
  if (mark->is_biased_anonymously() && !attempt_rebias) {
     //若是是匿名偏向且attempt_rebias==false會走到這裏,如鎖對象的hashcode方法被調用會出現這種狀況,須要撤銷偏向鎖。
    markOop biased_value       = mark;
    markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
    markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
    if (res_mark == biased_value) {
      return BIAS_REVOKED;
    }
  } else if (mark->has_bias_pattern()) {
    // 鎖對象開啓了偏向模式會走到這裏
    Klass* k = obj->klass();
    markOop prototype_header = k->prototype_header();
    //code 1: 若是對應class關閉了偏向模式
    if (!prototype_header->has_bias_pattern()) {
      markOop biased_value       = mark;
      markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
      assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
      return BIAS_REVOKED;
    //code2: 若是epoch過時
    } else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
      if (attempt_rebias) {
        assert(THREAD->is_Java_thread(), "");
        markOop biased_value       = mark;
        markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) {
          return BIAS_REVOKED_AND_REBIASED;
        }
      } else {
        markOop biased_value       = mark;
        markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) {
          return BIAS_REVOKED;
        }
      }
    }
  }
  //code 3:批量重偏向與批量撤銷的邏輯
  HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
  if (heuristics == HR_NOT_BIASED) {
    return NOT_BIASED;
  } else if (heuristics == HR_SINGLE_REVOKE) {
    //code 4:撤銷單個線程
    Klass *k = obj->klass();
    markOop prototype_header = k->prototype_header();
    if (mark->biased_locker() == THREAD &&
        prototype_header->bias_epoch() == mark->bias_epoch()) {
      // 走到這裏說明須要撤銷的是偏向當前線程的鎖,當調用Object#hashcode方法時會走到這一步
      // 由於只要遍歷當前線程的棧就行了,因此不須要等到safepoint再撤銷。
      ResourceMark rm;
      if (TraceBiasedLocking) {
        tty->print_cr("Revoking bias by walking my own stack:");
      }
      BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
      ((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
      assert(cond == BIAS_REVOKED, "why not?");
      return cond;
    } else {
      // 下面代碼最終會在VM線程中的safepoint調用revoke_bias方法
      VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
      VMThread::execute(&revoke);
      return revoke.status_code();
    }
  }
	
  assert((heuristics == HR_BULK_REVOKE) ||
         (heuristics == HR_BULK_REBIAS), "?");
   //code5:批量撤銷、批量重偏向的邏輯
  VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
                                (heuristics == HR_BULK_REBIAS),
                                attempt_rebias);
  VMThread::execute(&bulk_revoke);
  return bulk_revoke.status_code();
}

會走到該方法的邏輯有不少,咱們只分析最多見的狀況:假設鎖已經偏向線程A,這時B線程嘗試得到鎖。

上面的code 1code 2B線程都不會走到,最終會走到code 4處,若是要撤銷的鎖偏向的是當前線程則直接調用revoke_bias撤銷偏向鎖,不然會將該操做push到VM Thread中等到safepoint的時候再執行。

關於VM Thread這裏介紹下:在JVM中有個專門的VM Thread,該線程會源源不斷的從VMOperationQueue中取出請求,好比GC請求。對於須要safepoint的操做(VM_Operationevaluate_at_safepoint返回true)必需要等到全部的Java線程進入到safepoint纔開始執行。

接下來咱們着重分析下revoke_bias方法。第一個參數爲鎖對象,第二、3個參數爲都爲false

static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread) {
  markOop mark = obj->mark();
  // 若是沒有開啓偏向模式,則直接返回NOT_BIASED
  if (!mark->has_bias_pattern()) {
    ...
    return BiasedLocking::NOT_BIASED;
  }

  uint age = mark->age();
  // 構建兩個mark word,一個是匿名偏向模式(101),一個是無鎖模式(001)
  markOop   biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);
  markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);

  ...

  JavaThread* biased_thread = mark->biased_locker();
  if (biased_thread == NULL) {
     // 匿名偏向。當調用鎖對象的hashcode()方法可能會致使走到這個邏輯
     // 若是不容許重偏向,則將對象的mark word設置爲無鎖模式
    if (!allow_rebias) {
      obj->set_mark(unbiased_prototype);
    }
    ...
    return BiasedLocking::BIAS_REVOKED;
  }

  // code 1:判斷偏向線程是否還存活
  bool thread_is_alive = false;
  // 若是當前線程就是偏向線程 
  if (requesting_thread == biased_thread) {
    thread_is_alive = true;
  } else {
     // 遍歷當前jvm的全部線程,若是能找到,則說明偏向的線程還存活
    for (JavaThread* cur_thread = Threads::first(); cur_thread != NULL; cur_thread = cur_thread->next()) {
      if (cur_thread == biased_thread) {
        thread_is_alive = true;
        break;
      }
    }
  }
  // 若是偏向的線程已經不存活了
  if (!thread_is_alive) {
    // 容許重偏向則將對象mark word設置爲匿名偏向狀態,不然設置爲無鎖狀態
    if (allow_rebias) {
      obj->set_mark(biased_prototype);
    } else {
      obj->set_mark(unbiased_prototype);
    }
    ...
    return BiasedLocking::BIAS_REVOKED;
  }

  // 線程還存活則遍歷線程棧中全部的Lock Record
  GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
  BasicLock* highest_lock = NULL;
  for (int i = 0; i < cached_monitor_info->length(); i++) {
    MonitorInfo* mon_info = cached_monitor_info->at(i);
    // 若是能找到對應的Lock Record說明偏向的線程還在執行同步代碼塊中的代碼
    if (mon_info->owner() == obj) {
      ...
      // 須要升級爲輕量級鎖,直接修改偏向線程棧中的Lock Record。爲了處理鎖重入的case,在這裏將Lock Record的Displaced Mark Word設置爲null,第一個Lock Record會在下面的代碼中再處理
      markOop mark = markOopDesc::encode((BasicLock*) NULL);
      highest_lock = mon_info->lock();
      highest_lock->set_displaced_header(mark);
    } else {
      ...
    }
  }
  if (highest_lock != NULL) {
    // 修改第一個Lock Record爲無鎖狀態,而後將obj的mark word設置爲指向該Lock Record的指針
    highest_lock->set_displaced_header(unbiased_prototype);
    obj->release_set_mark(markOopDesc::encode(highest_lock));
    ...
  } else {
    // 走到這裏說明偏向線程已經不在同步塊中了
    ...
    if (allow_rebias) {
       //設置爲匿名偏向狀態
      obj->set_mark(biased_prototype);
    } else {
      // 將mark word設置爲無鎖狀態
      obj->set_mark(unbiased_prototype);
    }
  }

  return BiasedLocking::BIAS_REVOKED;
}

須要注意下,當調用鎖對象的Object#hashSystem.identityHashCode()方法會致使該對象的偏向鎖或輕量級鎖升級。這是由於在Java中一個對象的hashcode是在調用這兩個方法時才生成的,若是是無鎖狀態則存放在mark word中,若是是重量級鎖則存放在對應的monitor中,而偏向鎖是沒有地方能存放該信息的,因此必須升級。

言歸正傳,revoke_bias方法邏輯:

  1. 查看偏向的線程是否存活,若是已經不存活了,則直接撤銷偏向鎖。JVM維護了一個集合存放全部存活的線程,經過遍歷該集合判斷某個線程是否存活。

  2. 偏向的線程是否還在同步塊中,若是不在了,則撤銷偏向鎖。咱們回顧一下偏向鎖的加鎖流程:每次進入同步塊(即執行monitorenter)的時候都會以從高往低的順序在棧中找到第一個可用的Lock Record,將其obj字段指向鎖對象。每次解鎖(即執行monitorexit)的時候都會將最低的一個相關Lock Record移除掉。因此能夠經過遍歷線程棧中的Lock Record來判斷線程是否還在同步塊中。

  3. 將偏向線程全部相關Lock RecordDisplaced Mark Word設置爲null,而後將最高位的Lock RecordDisplaced Mark Word 設置爲無鎖狀態,最高位的Lock Record也就是第一次得到鎖時的Lock Record(這裏的第一次是指重入獲取鎖時的第一次),而後將對象頭指向最高位的Lock Record,這裏不須要用CAS指令,由於是在safepoint。 執行完後,就升級成了輕量級鎖。原偏向線程的全部Lock Record都已經變成輕量級鎖的狀態。這裏若是看不明白,請回顧上篇文章的輕量級鎖加鎖過程。

 

偏向鎖的釋放

偏向鎖的釋放入口在bytecodeInterpreter.cpp#1923

CASE(_monitorexit): {
  oop lockee = STACK_OBJECT(-1);
  CHECK_NULL(lockee);
  // derefing's lockee ought to provoke implicit null check
  // find our monitor slot
  BasicObjectLock* limit = istate->monitor_base();
  BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  // 從低往高遍歷棧的Lock Record
  while (most_recent != limit ) {
    // 若是Lock Record關聯的是該鎖對象
    if ((most_recent)->obj() == lockee) {
      BasicLock* lock = most_recent->lock();
      markOop header = lock->displaced_header();
      // 釋放Lock Record
      most_recent->set_obj(NULL);
      // 若是是偏向模式,僅僅釋放Lock Record就行了。不然要走輕量級鎖or重量級鎖的釋放流程
      if (!lockee->mark()->has_bias_pattern()) {
        bool call_vm = UseHeavyMonitors;
        // header!=NULL說明不是重入,則須要將Displaced Mark Word CAS到對象頭的Mark Word
        if (header != NULL || call_vm) {
          if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
            // CAS失敗或者是重量級鎖則會走到這裏,先將obj還原,而後調用monitorexit方法
            most_recent->set_obj(lockee);
            CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
          }
        }
      }
      //執行下一條命令
      UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
    }
    //處理下一條Lock Record
    most_recent++;
  }
  // Need to throw illegal monitor state exception
  CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
  ShouldNotReachHere();
}

上面的代碼結合註釋理解起來應該不難,偏向鎖的釋放很簡單,只要將對應Lock Record釋放就行了,而輕量級鎖則須要將Displaced Mark Word替換到對象頭的mark word中。若是CAS失敗或者是重量級鎖則進入到InterpreterRuntime::monitorexit方法中。該方法會在輕量級與重量級鎖的文章中講解。

 

批量重偏向和批量撤銷

批量重偏向和批量撤銷的背景能夠看上篇文章,相關實如今BiasedLocking::revoke_and_rebias中:

BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
  ...
  //code 1:重偏向的邏輯
  HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
  // 非重偏向的邏輯
  ...
      
  assert((heuristics == HR_BULK_REVOKE) ||
         (heuristics == HR_BULK_REBIAS), "?");	
   //code 2:批量撤銷、批量重偏向的邏輯
  VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
                                (heuristics == HR_BULK_REBIAS),
                                attempt_rebias);
  VMThread::execute(&bulk_revoke);
  return bulk_revoke.status_code();
}

在每次撤銷偏向鎖的時候都經過update_heuristics方法記錄下來,以類爲單位,當某個類的對象撤銷偏向次數達到必定閾值的時候JVM就認爲該類不適合偏向模式或者須要從新偏向另外一個對象,update_heuristics就會返回HR_BULK_REVOKEHR_BULK_REBIAS。進行批量撤銷或批量重偏向。

先看update_heuristics方法。

static HeuristicsResult update_heuristics(oop o, bool allow_rebias) {
  markOop mark = o->mark();
  //若是不是偏向模式直接返回
  if (!mark->has_bias_pattern()) {
    return HR_NOT_BIASED;
  }
 
  // 鎖對象的類
  Klass* k = o->klass();
  // 當前時間
  jlong cur_time = os::javaTimeMillis();
  // 該類上一次批量撤銷的時間
  jlong last_bulk_revocation_time = k->last_biased_lock_bulk_revocation_time();
  // 該類偏向鎖撤銷的次數
  int revocation_count = k->biased_lock_revocation_count();
  // BiasedLockingBulkRebiasThreshold是重偏向閾值(默認20),BiasedLockingBulkRevokeThreshold是批量撤銷閾值(默認40),BiasedLockingDecayTime是開啓一次新的批量重偏向距離上次批量重偏向的後的延遲時間,默認25000。也就是開啓批量重偏向後,通過了一段較長的時間(>=BiasedLockingDecayTime),撤銷計數器才超過閾值,那咱們會重置計數器。
  if ((revocation_count >= BiasedLockingBulkRebiasThreshold) &&
      (revocation_count <  BiasedLockingBulkRevokeThreshold) &&
      (last_bulk_revocation_time != 0) &&
      (cur_time - last_bulk_revocation_time >= BiasedLockingDecayTime)) {
    // This is the first revocation we've seen in a while of an
    // object of this type since the last time we performed a bulk
    // rebiasing operation. The application is allocating objects in
    // bulk which are biased toward a thread and then handing them
    // off to another thread. We can cope with this allocation
    // pattern via the bulk rebiasing mechanism so we reset the
    // klass's revocation count rather than allow it to increase
    // monotonically. If we see the need to perform another bulk
    // rebias operation later, we will, and if subsequently we see
    // many more revocation operations in a short period of time we
    // will completely disable biasing for this type.
    k->set_biased_lock_revocation_count(0);
    revocation_count = 0;
  }

  // 自增撤銷計數器
  if (revocation_count <= BiasedLockingBulkRevokeThreshold) {
    revocation_count = k->atomic_incr_biased_lock_revocation_count();
  }
  // 若是達到批量撤銷閾值則返回HR_BULK_REVOKE
  if (revocation_count == BiasedLockingBulkRevokeThreshold) {
    return HR_BULK_REVOKE;
  }
  // 若是達到批量重偏向閾值則返回HR_BULK_REBIAS
  if (revocation_count == BiasedLockingBulkRebiasThreshold) {
    return HR_BULK_REBIAS;
  }
  // 沒有達到閾值則撤銷單個對象的鎖
  return HR_SINGLE_REVOKE;
}

當達到閾值的時候就會經過VM 線程在safepoint調用bulk_revoke_or_rebias_at_safepoint, 參數bulk_rebias若是是true表明是批量重偏向不然爲批量撤銷。attempt_rebias_of_object表明對操做的鎖對象o是否運行重偏向,這裏是true

static BiasedLocking::Condition bulk_revoke_or_rebias_at_safepoint(oop o,
                                                                   bool bulk_rebias,
                                                                   bool attempt_rebias_of_object,
                                                                   JavaThread* requesting_thread) {
  ...
  jlong cur_time = os::javaTimeMillis();
  o->klass()->set_last_biased_lock_bulk_revocation_time(cur_time);


  Klass* k_o = o->klass();
  Klass* klass = k_o;

  if (bulk_rebias) {
    // 批量重偏向的邏輯
    if (klass->prototype_header()->has_bias_pattern()) {
      // 自增前類中的的epoch
      int prev_epoch = klass->prototype_header()->bias_epoch();
      // code 1:類中的epoch自增
      klass->set_prototype_header(klass->prototype_header()->incr_bias_epoch());
      int cur_epoch = klass->prototype_header()->bias_epoch();

      // code 2:遍歷全部線程的棧,更新類型爲該klass的全部鎖實例的epoch
      for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
        GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
        for (int i = 0; i < cached_monitor_info->length(); i++) {
          MonitorInfo* mon_info = cached_monitor_info->at(i);
          oop owner = mon_info->owner();
          markOop mark = owner->mark();
          if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
            // We might have encountered this object already in the case of recursive locking
            assert(mark->bias_epoch() == prev_epoch || mark->bias_epoch() == cur_epoch, "error in bias epoch adjustment");
            owner->set_mark(mark->set_bias_epoch(cur_epoch));
          }
        }
      }
    }

    // 接下來對當前鎖對象進行重偏向
    revoke_bias(o, attempt_rebias_of_object && klass->prototype_header()->has_bias_pattern(), true, requesting_thread);
  } else {
    ...

    // code 3:批量撤銷的邏輯,將類中的偏向標記關閉,markOopDesc::prototype()返回的是一個關閉偏向模式的prototype
    klass->set_prototype_header(markOopDesc::prototype());

    // code 4:遍歷全部線程的棧,撤銷該類全部鎖的偏向
    for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
      GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
      for (int i = 0; i < cached_monitor_info->length(); i++) {
        MonitorInfo* mon_info = cached_monitor_info->at(i);
        oop owner = mon_info->owner();
        markOop mark = owner->mark();
        if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
          revoke_bias(owner, false, true, requesting_thread);
        }
      }
    }

    // 撤銷當前鎖對象的偏向模式
    revoke_bias(o, false, true, requesting_thread);
  }

  ...
  
  BiasedLocking::Condition status_code = BiasedLocking::BIAS_REVOKED;

  if (attempt_rebias_of_object &&
      o->mark()->has_bias_pattern() &&
      klass->prototype_header()->has_bias_pattern()) {
    // 構造一個偏向請求線程的mark word
    markOop new_mark = markOopDesc::encode(requesting_thread, o->mark()->age(),
                                           klass->prototype_header()->bias_epoch());
    // 更新當前鎖對象的mark word
    o->set_mark(new_mark);
    status_code = BiasedLocking::BIAS_REVOKED_AND_REBIASED;
    ...
  }

  ...

  return status_code;
}

該方法分爲兩個邏輯:批量重偏向和批量撤銷。

先看批量重偏向,分爲兩步:

code 1 將類中的撤銷計數器自增1,以後當該類已存在的實例得到鎖時,就會嘗試重偏向,相關邏輯在偏向鎖獲取流程小節中。

code 2 處理當前正在被使用的鎖對象,經過遍歷全部存活線程的棧,找到全部正在使用的偏向鎖對象,而後更新它們的epoch值。也就是說不會重偏向正在使用的鎖,不然會破壞鎖的線程安全性。

批量撤銷邏輯以下:

code 3將類的偏向標記關閉,以後當該類已存在的實例得到鎖時,就會升級爲輕量級鎖;該類新分配的對象的mark word則是無鎖模式。

code 4處理當前正在被使用的鎖對象,經過遍歷全部存活線程的棧,找到全部正在使用的偏向鎖對象,而後撤銷偏向鎖。

相關文章
相關標籤/搜索