本文爲死磕Synchronized底層實現第三篇文章,內容爲輕量級鎖實現。java
輕量級鎖並不複雜,其中不少內容在偏向鎖一文中已說起過,與本文內容會有部分重疊。c++
另外輕量級鎖的背景和基本流程在概論中已有講解。強烈建議在看過兩篇文章的基礎下閱讀本文。git
本系列文章將對HotSpot的synchronized
鎖實現進行全面分析,內容包括偏向鎖、輕量級鎖、重量級鎖的加鎖、解鎖、鎖升級流程的原理及源碼分析,但願給在研究synchronized
路上的同窗一些幫助。主要包括如下幾篇文章:github
更多文章見我的博客:github.com/farmerjohng….net
本文分爲兩個部分:prototype
1.輕量級鎖獲取流程
2.輕量級鎖釋放流程
本人看的JVM版本是jdk8u,具體版本號以及代碼能夠在這裏看到。
下面開始輕量級鎖獲取流程分析,代碼在bytecodeInterpreter.cpp#1816。
CASE(_monitorenter): {
oop lockee = STACK_OBJECT(-1);
...
if (entry != NULL) {
...
// 上面省略的代碼中若是CAS操做失敗也會調用到InterpreterRuntime::monitorenter
// traditional lightweight locking
if (!success) {
// 構建一個無鎖狀態的Displaced Mark Word
markOop displaced = lockee->mark()->set_unlocked();
// 設置到Lock Record中去
entry->lock()->set_displaced_header(displaced);
bool call_vm = UseHeavyMonitors;
if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// 若是CAS替換不成功,表明鎖對象不是無鎖狀態,這時候判斷下是否是鎖重入
// Is it simple recursive case?
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
entry->lock()->set_displaced_header(NULL);
} else {
// CAS操做失敗則調用monitorenter
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
istate->set_msg(more_monitors);
UPDATE_PC_AND_RETURN(0); // Re-execute
}
}
複製代碼
若是鎖對象不是偏向模式或已經偏向其餘線程,則success
爲false
。這時候會構建一個無鎖狀態的mark word
設置到Lock Record
中去,咱們稱Lock Record
中存儲對象mark word
的字段叫Displaced Mark Word
。
若是當前鎖的狀態不是無鎖狀態,則CAS失敗。若是這是一次鎖重入,那直接將Lock Record
的 Displaced Mark Word
設置爲null
。
咱們看個demo,在該demo中重複3次得到鎖,
synchronized(obj){
synchronized(obj){
synchronized(obj){
}
}
}
複製代碼
假設鎖的狀態是輕量級鎖,下圖反應了mark word
和線程棧中Lock Record
的狀態,能夠看到右邊線程棧中包含3個指向當前鎖對象的Lock Record
。其中棧中最高位的Lock Record
爲第一次獲取鎖時分配的。其Displaced Mark word
的值爲鎖對象的加鎖前的mark word
,以後的鎖重入會在線程棧中分配一個Displaced Mark word
爲null
的Lock Record
。
爲何JVM選擇在線程棧中添加Displaced Mark word
爲null的Lock Record
來表示重入計數呢?首先鎖重入次數是必定要記錄下來的,由於每次解鎖都須要對應一次加鎖,解鎖次數等於加鎖次數時,該鎖才真正的被釋放,也就是在解鎖時須要用到說鎖重入次數的。一個簡單的方案是將鎖重入次數記錄在對象頭的mark word
中,但mark word
的大小是有限的,已經存放不下該信息了。另外一個方案是隻建立一個Lock Record
並在其中記錄重入次數,Hotspot沒有這樣作的緣由我猜是考慮到效率有影響:每次重入得到鎖都須要遍歷該線程的棧找到對應的Lock Record
,而後修改它的值。
因此最終Hotspot選擇每次得到鎖都添加一個Lock Record
來表示鎖的重入。
接下來看看InterpreterRuntime::monitorenter
方法
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
...
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (UseBiasedLocking) {
// Retry fast entry if bias is revoked to avoid unnecessary inflation
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
...
IRT_END
複製代碼
fast_enter
的流程在偏向鎖一文已經分析過,若是當前是偏向模式且偏向的線程還在使用鎖,那會將鎖的mark word
改成輕量級鎖的狀態,同時會將偏向的線程棧中的Lock Record
修改成輕量級鎖對應的形式。代碼位置在biasedLocking.cpp#212。
// 線程還存活則遍歷線程棧中全部的Lock Record
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
BasicLock* highest_lock = NULL;
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
// 若是能找到對應的Lock Record說明偏向的線程還在執行同步代碼塊中的代碼
if (mon_info->owner() == obj) {
...
// 須要升級爲輕量級鎖,直接修改偏向線程棧中的Lock Record。爲了處理鎖重入的case,在這裏將Lock Record的Displaced Mark Word設置爲null,第一個Lock Record會在下面的代碼中再處理
markOop mark = markOopDesc::encode((BasicLock*) NULL);
highest_lock = mon_info->lock();
highest_lock->set_displaced_header(mark);
} else {
...
}
}
if (highest_lock != NULL) {
// 修改第一個Lock Record爲無鎖狀態,而後將obj的mark word設置爲執行該Lock Record的指針
highest_lock->set_displaced_header(unbiased_prototype);
obj->release_set_mark(markOopDesc::encode(highest_lock));
...
} else {
...
}
複製代碼
咱們看slow_enter
的流程。
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
// 若是是無鎖狀態
if (mark->is_neutral()) {
//設置Displaced Mark Word並替換對象頭的mark word
lock->set_displaced_header(mark);
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
} else
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
// 若是是重入,則設置Displaced Mark Word爲null
lock->set_displaced_header(NULL);
return;
}
...
// 走到這一步說明已是存在多個線程競爭鎖了 須要膨脹爲重量級鎖
lock->set_displaced_header(markOopDesc::unused_mark());
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
複製代碼
CASE(_monitorexit): {
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);
// derefing's lockee ought to provoke implicit null check
// find our monitor slot
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
// 從低往高遍歷棧的Lock Record
while (most_recent != limit ) {
// 若是Lock Record關聯的是該鎖對象
if ((most_recent)->obj() == lockee) {
BasicLock* lock = most_recent->lock();
markOop header = lock->displaced_header();
// 釋放Lock Record
most_recent->set_obj(NULL);
// 若是是偏向模式,僅僅釋放Lock Record就行了。不然要走輕量級鎖or重量級鎖的釋放流程
if (!lockee->mark()->has_bias_pattern()) {
bool call_vm = UseHeavyMonitors;
// header!=NULL說明不是重入,則須要將Displaced Mark Word CAS到對象頭的Mark Word
if (header != NULL || call_vm) {
if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
// CAS失敗或者是重量級鎖則會走到這裏,先將obj還原,而後調用monitorexit方法
most_recent->set_obj(lockee);
CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
}
}
}
//執行下一條命令
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
//處理下一條Lock Record
most_recent++;
}
// Need to throw illegal monitor state exception
CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
ShouldNotReachHere();
}
複製代碼
輕量級鎖釋放時須要將Displaced Mark Word
替換到對象頭的mark word
中。若是CAS失敗或者是重量級鎖則進入到InterpreterRuntime::monitorexit
方法中。
//%note monitor_1
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
Handle h_obj(thread, elem->obj());
...
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
// Free entry. This must be done here, since a pending exception might be installed on
//釋放Lock Record
elem->set_obj(NULL);
...
IRT_END
複製代碼
monitorexit
調用完slow_exit
方法後,就釋放Lock Record
。
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
fast_exit (object, lock, THREAD) ;
}
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
...
markOop dhw = lock->displaced_header();
markOop mark ;
if (dhw == NULL) {
// 重入鎖,什麼也不作
...
return ;
}
mark = object->mark() ;
// 若是是mark word==Displaced Mark Word即輕量級鎖,CAS替換對象頭的mark word
if (mark == (markOop) lock) {
assert (dhw->is_neutral(), "invariant") ;
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
TEVENT (fast_exit: release stacklock) ;
return;
}
}
//走到這裏說明是重量級鎖或者解鎖時發生了競爭,膨脹後調用重量級鎖的exit方法。
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
複製代碼
該方法中先判斷是否是輕量級鎖,若是是輕量級鎖則將替換mark word
,不然膨脹爲重量級鎖並調用exit
方法,相關邏輯將在重量級鎖的文章中講解。