該文章屬於《Java併發編程》系列文章,若是想了解更多,請點擊《Java併發編程之總目錄》html
上篇文章咱們講了volatile關鍵字,咱們大體瞭解了其爲輕量級的同步機制,如今咱們來說講咱們關於同步的另外一個兄弟synchronized。synchronized做爲開發中經常使用的同步機制,也是咱們處理線程安全的經常使用方法。相信你們對其都比較熟悉。可是對於其內部原理與底層代碼實現你們有可能不是很瞭解,下面我就和你們一塊兒完全瞭解synchronized的使用方式與底層原理。java
線程安全的定義:當多個線程訪問某個類時,無論運行時環境採用何種調度方式或者這些線程將如何交替執行,而且在主調代碼中不須要任何額外的同步或協同,這個類都能表現出正確的行爲,那麼這個類就是線程安全的。node
在具體講解synchronized以前,咱們須要瞭解一下什麼是線程安全,爲何會出現線程線程不安全的問題。請看下列代碼:編程
class ThreadNotSafeDemo {
private static class Count {
private int num;
private void count() {
for (int i = 1; i <= 10; i++) {
num += i;
}
System.out.println(Thread.currentThread().getName() + "-" + num);
}
}
public static void main(String[] args) {
Runnable runnable = new Runnable() {
Count count = new Count();
public void run() {
count.count();
}
};
//建立10個線程,
for (int i = 0; i < 10; i++) {
new Thread(runnable).start();
}
}
}
複製代碼
上述代碼中,咱們建立Count類,在該類中有一個count()方法,計算從1一直加到10的和,在計算完後輸出當前線程的名稱與計算的結果,咱們指望線程輸出的結果是首項爲55且等差爲55的等差數列。可是結果並非咱們指望的。具體結果以下圖所示:數組
咱們能夠看見,線程並無按照咱們之間想的那樣,線程按照從Thread-0到Thread-9依次排列,而且Thread-0與Thread-1線程輸出的結果是錯誤的。安全
之因此會出現這樣的狀況,是CPU在調度的時候線程是能夠交替執行的,具體來說是由於當前線程Thread-0求和後,(求和後num值爲55),在即將執行打印語句時,忽然CPU開始調度執行Thread-1去執行count()方法,那麼Thread-0就會停留在即將打印語句的位置,當Thread-1執行計算和後(求和後num值爲100),這個時候CPU又開始調度Thread-0執行打印語句。則Thread-1開始暫停,而這個時候num值已經爲110了,因此Thread-0打印輸出的結果爲110。bash
上面咱們瞭解了之因此會出現線程安全的問題,主要緣由就是由於存在多條線程共同操做共享數據,同時CPU的調度的時候線程是能夠交替執行的。致使了程序的語義發生改變,因此會出現與咱們預期的結果違背的狀況。所以爲了解決這個問題,在Java中提供了兩種方式來處理這種狀況。數據結構
互斥同步是指當存在多個線程操做共享數據時,須要保證同一時刻有且只有一個線程在操做共享數據,其餘線程必須等到該線程處理完數據後再進行。多線程
在Java中最基本的互斥同步就是synchronized(這裏咱們討論的是jdk1.6以前,在jdk1.6以後Java團隊對鎖進行了優化,後面文章會具體描述),也就是說當一個共享數據被當前正在訪問的線程加上互斥鎖後,在同一個時刻,其餘線程只能處於等待的狀態,直到當前線程處理完畢釋放該鎖。併發
除了synchronized以外,咱們還可使用java.util.concurrent包下的ReentrantLock來實現同步。
互斥同步主要的問題就是進行線程阻塞和喚醒鎖帶來的性能問題,爲了解決這性能問題,咱們有另外一種解決方案,當多個線程競爭某個共享數據時,沒有得到鎖的線程不會阻塞,而是不斷的嘗試去獲取鎖,直到成功爲止。這種方案的原理就是使用循環CAS操做來實現。
瞭解了synchronized的解決的問題,那麼咱們繼續來看看在Java中在Java中synchronized的使用狀況。
在Java中synchronized主要有三種使用的狀況。下面分別列出了這幾種狀況
爲了證實普通的同步方法中,鎖是當前對象。請觀察如下代碼:
class SynchronizedDemo {
public synchronized void normalMethod() {
doPrint(5);
}
public void blockMethod() {//注意,同步塊方法塊中,配置的是當前類的對象
synchronized (this) {
doPrint(5);
}
}
//打印當前線程信息與角標值
private static void doPrint(int index) {
while (index-- > 0) {
System.out.println(Thread.currentThread().getName() + "--->" + index);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
SynchronizedDemo demo = new SynchronizedDemo();
new Thread(() -> demo.normalMethod(), "testNormalMethod").start();
new Thread(() -> demo.normalMethod(), "testBlockMethod").start();
}
}
複製代碼
在上訴代碼中,分別建立了兩個方法,normalMethod()與blockMethod()方法,其中normalMethod()方法爲普通的同步方法,blockMethod()方法中,是一個同步塊且配置的對象是當前類的對象。在Main()方法中,分別建立兩個線程執行兩個不一樣的方法。
class SynchronizedDemo {
public void blockMethod() {
synchronized (SynchronizedDemo.class) {//注意,同步塊方法塊中,配置的是當前類的Class對象
doPrint(5);
}
}
public static synchronized void staticMethod() {
doPrint(5);
}
/**
* 打印當前線程信息
*/
private static void doPrint(int index) {
while (index-- > 0) {
System.out.println(Thread.currentThread().getName() + "--->" + index);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
SynchronizedDemo demo = new SynchronizedDemo();
new Thread(() -> demo.blockMethod(), "testBlockMethod").start();
new Thread(() -> demo.staticMethod(), "testStaticMethod").start();
}
}
複製代碼
在有了第一個結論的證實後,對於靜態同步方法的鎖對象就再也不進行描述了(可是你們要注意一下,同步方法塊中配置的對象是當前類的Class對象)。下面直接給出輸出結果:
觀察結果,也很明顯的證實了對於靜態同步方法,鎖式當前類的Class對象的結論
下面文章主要是講解jdk1.6以後Java團隊對鎖進行了優化以後的原理,優化以後涉及到偏向鎖、輕量級鎖、重量級鎖。其中該文章都涉及jdk源碼,這裏把最新的jdk源碼分享給你們----->jdk源碼)
在瞭解Synchronized的原理的原理以前,咱們須要知道三個知識點第一個是CAS操做,、第二個是Java對象頭(其中Synchronized使用的鎖就在對象頭中)、第三個是jdk1.6對鎖的優化。在瞭解以上三個知識點後,再去理解其原理就相對輕鬆一點。關於CAS操做已經在上篇文章《Java併發編程之Java CAS操做》進行過講解,下面咱們來說解關於Java對象頭與鎖優化的知識點。
在Java虛擬機中,對象在內存的存儲的佈局能夠分爲3塊區域:對象頭(Header)、實例數據(Instance Data)、對其填充(Padding)。其中虛擬機中的對象頭包括三部分信息,分別爲"Mark Word"、類型指針、記錄數組長度的數據(可選),具體狀況以下圖所示:
其中關於"Mark Word",由於存儲對象頭信息是與對象身定義的數據無關的額外的存儲成本,考慮到虛擬機的空間效率,"Mark Word"被設計成一個被設計成一個非固定的數據結構以便在極小的空間存儲儘可能多的信息。它會根據對象的狀態複用本身的存儲區域。在JVM中,「Mark Word"的實現是在markOop.hpp文件中的markOopDesc類。經過註釋咱們大體瞭解」Mark Word"的結構,具體代碼以下:
hash:保存對象的哈希碼
age:GC分代年齡
biased_lock:偏向鎖標誌
lock:鎖狀態標誌
JavaThread* 當前線程
epoch:保存偏向時間戳
// 32 bits:
// --------
// hash:25 ------------>| age:4 biased_lock:1 lock:2 (normal object)
// JavaThread*:23 epoch:2 age:4 biased_lock:1 lock:2 (biased object)
// 省略部分代碼
// 64 bits:
// --------
// unused:25 hash:31 -->| unused:1 age:4 biased_lock:1 lock:2 (normal object)
// JavaThread*:54 epoch:2 unused:1 age:4 biased_lock:1 lock:2 (biased object)
// 省略部分代碼
複製代碼
在上述代碼中,分紅了2種不一樣位數的操做系統,32位與64位。其中關於當前鎖的狀態標誌markOopDesc類中也進行了詳細的說明,具體代碼以下:
enum { locked_value = 0,//輕量級鎖 對應[00]
unlocked_value = 1,//無鎖狀態 對應[01]
monitor_value = 2,//重量級鎖 對應[10]
marked_value = 3,//GC標記 對應[11]
biased_lock_pattern = 5//是不是偏向鎖 對應[101] 其中biased_lock一個bit位,lock兩個bit位
};
複製代碼
那麼根據上述代碼,咱們以32位操做系統爲例,能夠生成以下兩張表:
Java SE 1.6爲了減小得到鎖和釋放鎖帶來的性能消耗,引入了「偏向鎖」和「輕量級鎖」,在Java SE 1.6中,鎖一共有4種狀態,級別從低到高依次是:無鎖狀態、偏向鎖狀態、輕量級鎖狀態和重量級鎖狀態,這幾個狀態會隨着競爭狀況逐漸升級。鎖能夠升級但不能降級,意味着偏向鎖升級成輕量級鎖後不能降級成偏向鎖。這種鎖升級卻不能降級的策略,目的是爲了提升得到鎖和釋放鎖的效率。下面會對各類鎖進行介紹。
在瞭解了上述知識點後,咱們來了解一下synchronized底層代碼實現。從JVM規範中能夠看到Synchonized在JVM裏的實現原理,JVM基於進入和退出Monitor對象來實現方法同步和代碼塊同步,但二者的實現細節不同。代碼塊同步是使用monitorenter和monitorexit指令實現的,而方法同步是使用字節碼同步指令ACC_SYNCHRONIZED來實現的,細節在JVM規範裏並無詳細說明。可是方法的同步一樣可使用這兩個指令來實現。那咱們這裏咱們就以synchronized代碼塊底層原理來進行講解。
字節碼同步指令ACC_SYNCHRONIZED原理:JVM經過使用管程(Monitor)來支持同步,JVM能夠從方法常量池的方法表結構中的ACC_SYNCHRONIZED訪問標誌來得知一個方法是否聲明爲同步方法,當方法調用時,調用指令將會檢查方法的ACC_SYNCHRONIZED訪問標誌是否被設置,若是設置了,執行線程就要求先成功持有管程(Monitor),而後才能執行方法,最後當方法完成(不管是正常完成仍是非正常完成)時釋放管程,在方法執行期間,執行線程持有了管程,其餘任何線程都沒法在獲取到同一個管程。
在瞭解 synchronized代碼塊底層原理以前,咱們先了解咱們經常使用的synchronized代碼塊使用方式。
public class SyncCodeBlock {
public int i;
public void syncTask(){
//同步代碼庫
synchronized (this){
i++;
}
}
}
複製代碼
而後咱們經過javap指令反編譯獲得字節碼。
//===========主要看看syncTask方法實現================
public void syncTask();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=3, locals=3, args_size=1
0: aload_0
1: dup
2: astore_1
3: monitorenter //注意此處,進入同步方法
4: aload_0
5: dup
6: getfield #2 // Field i:I
9: iconst_1
10: iadd
11: putfield #2 // Field i:I
14: aload_1
15: monitorexit //注意此處,退出同步方法
16: goto 24
19: astore_2
20: aload_1
21: monitorexit //注意此處,退出同步方法
22: aload_2
23: athrow
24: return
Exception table:
//省略其餘字節碼.......
}
複製代碼
從上訴代碼中,咱們能夠明白當咱們聲明synchronized代碼塊的時候,編譯器會咱們生產相應的monitorenter 與monitorexit 指令。當咱們的JVM把字節碼加載到內存的時候,會對這兩個指令進行解析。其中關於monitorenter 與monitorenter的指令解析是經過InterpreterRuntime.cpp文件中的InterpreterRuntime::monitorenter與InterpreterRuntime::monitorexit兩個函數分別實現的。
在瞭解具體的方法實現之間,咱們須要瞭解兩個參數信息,第一個參數猜都都猜出來,當前線程的指針,第二個參數爲BasicObjectLock類型的指針,那咱們來看看BasicObjectLock究竟是什麼東西。
關於BasicObjectLock的類具體聲明是在basicLock.hpp文件下。
class BasicLock {
friend class VMStructs;
friend class JVMCIVMStructs;
private:
//指向"Mark Word「也就是咱們提到過的markOopDesc的指針。這裏markOop是markOopDesc的別名 volatile markOop _displaced_header; public: //獲取"Mark Word「也就是咱們提到過的markOopDesc,這裏markOop是markOopDesc的別名
markOop displaced_header() const { return _displaced_header; }
void set_displaced_header(markOop header) { _displaced_header = header; }
//省略部分代碼
};
class BasicObjectLock {
friend class VMStructs;
private:
BasicLock _lock; //擁有BasicLock對象
oop _obj;
//省略部分代碼
};
複製代碼
從該文件中,咱們知道在BasicLock類中指向"Mark Word「的指針,同時BasicObjectLock 也擁有BasicLock對象,那麼BasicObjectLock 就能訪問」Mark Word「中的內容了。那如今咱們再來看上面提到的兩個對應的方法。
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
//省略部分代碼
if (UseBiasedLocking) {//判斷是否使用偏向鎖
//若是是使用偏向鎖,則進入快速獲取,避免沒必要要的膨脹。
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {//不然直接走輕量級鎖的獲取
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
//省略部分代碼
複製代碼
當monitorenter方法執行時,會先判斷當前是否開啓偏向鎖(偏向鎖在Java 6和Java 7裏是默認啓用的,可是它在應用程序啓動幾秒鐘以後才激活,若有必要可使用JVM參數來關閉延遲:-XX:BiasedLockingStartupDelay=0。若是你肯定應用程序裏全部的鎖一般狀況下處於競爭狀態,能夠經過JVM參數關閉偏向鎖:-XX:-UseBiasedLocking=false,那麼程序默認會進入輕量級鎖狀態),若是沒有開啓會直接走輕量級鎖的獲取,也就是slow_enter()方法。
ObjectSynchronizer::fast_enter()方法是在sychronizer.cpp文件進行聲明的,具體代碼以下:
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock,
bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) {//若是使用偏向鎖
if (!SafepointSynchronize::is_at_safepoint()) {//若是不在安全點,獲取當前偏向鎖的狀態(可能撤銷與重偏向)
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {//若是是撤銷與重偏向直接返回
return;
}
} else {//若是在安全點,有可能會撤銷偏向鎖
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
//省略部分代碼
}
slow_enter(obj, lock, THREAD);//若是不使用偏向鎖,則走輕量級鎖的獲取
}
複製代碼
在該方法中若是當前JVM支持偏向鎖,會須要等待全局安全點(在這個時間點上沒有正在執行的字節碼),若是當前不在安全點中,會調用revoke_and_rebias()方法來獲取當前偏向鎖的狀態(可能爲撤銷或撤銷後重偏向)。若是在安全點,會根據當前偏向鎖的狀態來判斷是否須要撤銷偏向鎖。其中revoke_and_rebias()方法是在biasedLocking.cpp中進行聲明的。
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
markOop mark = obj->mark();
//第一步,若是沒有其餘線程佔用該對象(mark word中線程id爲0,後三位爲101,且不嘗試重偏向)
//這裏「fast enter()方法"傳入的attempt_rebias爲true if (mark->is_biased_anonymously() && !attempt_rebias) { //通常來說,只有在從新計算對象hashCode的時候纔會進入該分支, //因此直接用用CAS操做將對象設置爲無鎖狀態 markOop biased_value = mark; markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age()); markOop res_mark = obj->cas_set_mark(unbiased_prototype, mark);//cas 操做重新設置偏向鎖的狀態 if (res_mark == biased_value) {//若是CAS操做失敗,說明存在競爭,偏向鎖爲撤銷狀態 return BIAS_REVOKED; } } else if (mark->has_bias_pattern()) { //第二步,判斷當前偏向鎖是否已經鎖定(無論mark word中線程id是否爲null),嘗試重偏向 Klass* k = obj->klass(); markOop prototype_header = k->prototype_header(); if (!prototype_header->has_bias_pattern()) { //第三步若是有線程對該對象進行了全局鎖定(即同步了靜態方法/屬性),則取消偏向操做 markOop biased_value = mark; markOop res_mark = obj->cas_set_mark(prototype_header, mark); assert(!obj->mark()->has_bias_pattern(), "even if we raced, should still be revoked"); return BIAS_REVOKED;//偏向鎖爲撤銷狀態 } else if (prototype_header->bias_epoch() != mark->bias_epoch()) { //第四步,若是偏向鎖時間過時,(這個時候有另外一個線程經過偏向鎖獲取到了這個對象的鎖) if (attempt_rebias) {//第五步,若是偏向鎖開啓,從新經過cas操做更新時間戳與分代年齡。 assert(THREAD->is_Java_thread(), ""); markOop biased_value = mark; markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch()); markOop res_mark = obj->cas_set_mark(rebiased_prototype, mark); if (res_mark == biased_value) { return BIAS_REVOKED_AND_REBIASED;//撤銷偏移後重偏向。 } } else {//第六步,若是偏向鎖關閉,經過CAS操做更新分代年齡 markOop biased_value = mark; markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age()); markOop res_mark = obj->cas_set_mark(unbiased_prototype, mark); if (res_mark == biased_value) { return BIAS_REVOKED;////若是CAS操做失敗,說明存在競爭,偏向鎖爲撤銷狀態 } } } } //省略部分代碼... } 複製代碼
偏向鎖的獲取由BiasedLocking::revoke_and_rebias方法實現,主要分爲五個步驟
在上文中咱們提到了在調用fast_enter()方法時,若是在安全點,這時會根據偏向鎖的狀態來判斷是否須要撤銷偏向鎖,也就是調用revoke_at_safepoint()方法。其中該方法也是在biasedLocking.cpp中進行聲明的,具體代碼以下:
void BiasedLocking::revoke_at_safepoint(Handle h_obj) {
assert(SafepointSynchronize::is_at_safepoint(), "must only be called while at safepoint");
oop obj = h_obj();
HeuristicsResult heuristics = update_heuristics(obj, false);//得到偏向鎖偏向與撤銷的次數
if (heuristics == HR_SINGLE_REVOKE) {//若是是一次撤銷
revoke_bias(obj, false, false, NULL, NULL);
} else if ((heuristics == HR_BULK_REBIAS) ||//若是是屢次撤銷或屢次偏向
(heuristics == HR_BULK_REVOKE)) {
bulk_revoke_or_rebias_at_safepoint(obj, (heuristics == HR_BULK_REBIAS), false, NULL);
}
clean_up_cached_monitor_info();
}
複製代碼
觀察代碼咱們能夠發現,會根據當前偏向鎖偏向與撤銷的次數走不一樣的方法。這裏咱們以revoke_bias()方法爲例,來進行講解。具體代碼以下:
static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread, JavaThread** biased_locker) {
//省略部分代碼...
uint age = mark->age();
markOop biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);
markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);
JavaThread* biased_thread = mark->biased_locker();
if (biased_thread == NULL) {//判斷當前偏向鎖中,偏向線程id是否爲null
if (!allow_rebias) {//若是不容許重偏向,則使其偏向鎖不可用。
obj->set_mark(unbiased_prototype);
}
//省略部分代碼...
return BiasedLocking::BIAS_REVOKED;
}
//判斷當前偏向鎖偏向的線程是否存在
bool thread_is_alive = false;
if (requesting_thread == biased_thread) {
thread_is_alive = true;
} else {
ThreadsListHandle tlh;
thread_is_alive = tlh.includes(biased_thread);
}
if (!thread_is_alive) {//若是當前偏向鎖偏向的線程不存活
if (allow_rebias) {
obj->set_mark(biased_prototype);//若是容許偏向,則將偏向鎖中的 線程id置爲null
} else {
obj->set_mark(unbiased_prototype);//不然,將偏向鎖設置爲無鎖狀態 也就是01
}
return BiasedLocking::BIAS_REVOKED;
}
//遍歷當前鎖記錄,找到擁有鎖的線程,將須要的displaced headers 寫到線程堆棧中。
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
BasicLock* highest_lock = NULL;
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
if (oopDesc::equals(mon_info->owner(), obj)) {
markOop mark = markOopDesc::encode((BasicLock*) NULL);
highest_lock = mon_info->lock();
highest_lock->set_displaced_header(mark);//將dispalece headers 寫入堆棧中
}
//省略部分代碼...
}
if (highest_lock != NULL) {//將須要的displaced headers 寫到線程堆棧
//省略部分代碼...
highest_lock->set_displaced_header(unbiased_prototype);
//省略部分代碼...
obj->release_set_mark(markOopDesc::encode(highest_lock));
//省略部分代碼...
} else {//將對象的頭恢復到未鎖定或無偏狀態
//省略部分代碼...
if (allow_rebias) {
obj->set_mark(biased_prototype);
} else {
// Store the unlocked value into the object's header. obj->set_mark(unbiased_prototype); } } //獲取偏向鎖指向的線程 if (biased_locker != NULL) { *biased_locker = biased_thread; } return BiasedLocking::BIAS_REVOKED; } 複製代碼
在偏向鎖的撤銷,須要等待全局全局點(這個時間點沒有在執行的字節碼),它會首先暫停擁有偏向鎖的線程,而後檢查持有偏向鎖的線程是否活着,若是線程不處於活動狀態。會更將偏向鎖設置爲無鎖狀態,若是線程仍然活着,擁有偏向鎖的棧 會被執行,遍歷偏向對象的鎖記錄,棧中的鎖記錄和對象頭的Mark Word要麼從新偏向於其餘線程,要麼恢復到無鎖或者標記對象不適合做爲偏向鎖,最後喚醒暫停的線程。
在上文中咱們說過當monitorenter指令執行時,若是當前偏向鎖沒有開啓或多個線程競爭偏向鎖致使偏向鎖升級爲輕量級鎖時,那麼會直接走輕量級的鎖的獲取。在講解輕量級鎖的獲取以前,須要講解一個知識點」Displaced Mark Word"。
在代碼進入同步塊,執行輕量級鎖獲取以前,若是此同步對象沒有被鎖定(鎖標誌爲01狀態),JVM會在當前線程的幀棧中創建一個名爲鎖記錄(Lock Record)的空間,用於存儲對象目前的"Mark Word"的拷貝(官方把這份拷貝加了一個Displaced前綴,及Displaced Mark Word)。虛擬機將使用CAS操做嘗試將對象的「Mark word"更新爲指向Lock Record的指針,若是這個更新動做成功了,那麼這個現場就擁有了該對象的鎖,及該對象處於輕量級鎖定狀態。關於輕量級鎖的獲取,具體示意圖以下:
在瞭解了具體的輕量級鎖獲取流程後,咱們來查看具體的實現slow_enter()方法。該方法是在sychronizer.cpp文件進行聲明的。具體代碼以下:
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();//第一步 獲取鎖對象的「mark word" if (mark->is_neutral()) {//第二步,判斷當前鎖是不是無鎖狀態 後兩位標誌位爲01 //第三步,若是是無鎖狀態,存儲對象目前的「mark word"拷貝,
//經過CAS嘗試將鎖對象Mark Word更新爲指向lock Record對象的指針,
lock->set_displaced_header(mark);
if (mark == obj()->cas_set_mark((markOop) lock, mark)) {
TEVENT(slow_enter: release stacklock); //若是更新成功,表示得到鎖,則執行同步代碼,
return;
}
}
//第四步,若是當前mark處於加鎖狀態,且線程幀棧中的owner指向當前鎖,則執行同步代碼,
else if (mark->has_locker() &&
THREAD->is_lock_owned((address)mark->locker())) {
lock->set_displaced_header(NULL);
return;
}
//第五步,不然說明有多個線程競爭輕量級鎖,輕量級鎖須要膨脹升級爲重量級鎖;
lock->set_displaced_header(markOopDesc::unused_mark());
ObjectSynchronizer::inflate(THREAD,
obj(),
inflate_cause_monitor_enter)->enter(THREAD);
}
複製代碼
在輕量級鎖的獲取中,主要分爲五個步驟,主要步驟以下:
適用情形:假設線程A和B同時執行到臨界區if (mark->is_neutral()):
在上文中,咱們講過當走完同步塊的時候,會執行monitorexit指令,而輕量級鎖的釋放這正是在monitorexit執行的時候,也就是InterpreterRuntime::monitorexit()。
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (elem == NULL || h_obj()->is_unlocked()) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
// Free entry. This must be done here, since a pending exception might be installed on
// exit. If it is not cleared, the exception handling code will try to unlock the monitor again.
elem->set_obj(NULL);
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
複製代碼
在monitorexit()方法中內部會調用slow_exit()方法而slow_exit()方法內部會調用fast_exit()方法,咱們查看fast_exit()方法。
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
markOop mark = object->mark();
//省略部分代碼...
markOop dhw = lock->displaced_header();//獲取線程堆棧中的Displaced Mark Word
if (dhw == NULL) {//若是線程堆棧中的Displaced Mark Word爲null
#ifndef PRODUCT
if (mark != markOopDesc::INFLATING()) {//若是當前輕量級鎖不是在膨脹爲重量級鎖
//省略部分代碼...
if (mark->has_monitor()) {//若是已經爲重量級鎖,直接返回
ObjectMonitor * m = mark->monitor();
assert(((oop)(m->object()))->mark() == mark, "invariant");
assert(m->is_entered(THREAD), "invariant");
}
}
#endif
return;
}
//若是當前線程擁有輕量級鎖,那麼經過CAS嘗試把Displaced Mark Word替換到當前鎖對象的Mark Word,
//若是CAS成功,說明成功的釋放了鎖
if (mark == (markOop) lock) {
assert(dhw->is_neutral(), "invariant");
if (object->cas_set_mark(dhw, mark) == mark) {
TEVENT(fast_exit: release stack-lock);
return;
}
}
//若是CAS操做失敗,說明其餘線程在嘗試獲取輕量級鎖,這個時候須要將輕量級鎖升級爲重量級鎖
ObjectSynchronizer::inflate(THREAD,
object,
inflate_cause_vm_internal)->exit(true, THREAD);
}
複製代碼
在偏向鎖的釋放中,會經歷一下幾個步驟。
在上文中咱們提到過,在多個線程進行輕量級鎖的獲取時或在輕量級鎖撤銷時,有肯能會膨脹爲重量級鎖,那如今咱們就來看看膨脹的具體過程
ObjectMonitor* ObjectSynchronizer::inflate(Thread * Self,
oop object,
const InflateCause cause) {
EventJavaMonitorInflate event;
for (;;) {//開始自旋
const markOop mark = object->mark();
// The mark can be in one of the following states:
// * Inflated - just return
// * Stack-locked - coerce it to inflated
// * INFLATING - busy wait for conversion to complete
// * Neutral - aggressively inflate the object.
// * BIASED - Illegal. We should never see this
//1.若是當前鎖已經爲重量級鎖了,直接返回
if (mark->has_monitor()) {
ObjectMonitor * inf = mark->monitor();
return inf;
}
//2.若是正在膨脹的過程當中,在完成膨脹過程當中,其餘線程必須等待。
if (mark == markOopDesc::INFLATING()) {
TEVENT(Inflate: spin while INFLATING);
ReadStableMark(object);
continue;
}
//3.若是當前爲輕量級鎖,迫使其膨脹爲重量級鎖
if (mark->has_locker()) {
ObjectMonitor * m = omAlloc(Self);
m->Recycle();
m->_Responsible = NULL;
m->_recursions = 0;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit; // Consider: maintain by type/class
markOop cmp = object->cas_set_mark(markOopDesc::INFLATING(), mark);
if (cmp != mark) {
omRelease(Self, m, true);
continue; // Interference -- just retry
}
markOop dmw = mark->displaced_mark_helper();
assert(dmw->is_neutral(), "invariant");
m->set_header(dmw);
m->set_owner(mark->locker());
m->set_object(object);
// TODO-FIXME: assert BasicLock->dhw != 0.
// Must preserve store ordering. The monitor state must
// be stable at the time of publishing the monitor address.
guarantee(object->mark() == markOopDesc::INFLATING(), "invariant");
object->release_set_mark(markOopDesc::encode(m));
// Hopefully the performance counters are allocated on distinct cache lines
// to avoid false sharing on MP systems ...
OM_PERFDATA_OP(Inflations, inc());
TEVENT(Inflate: overwrite stacklock);
if (log_is_enabled(Debug, monitorinflation)) {
if (object->is_instance()) {
ResourceMark rm;
log_debug(monitorinflation)("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
p2i(object), p2i(object->mark()),
object->klass()->external_name());
}
}
if (event.should_commit()) {
post_monitor_inflate_event(&event, object, cause);
}
return m;
}
//4.若是爲無鎖狀態,重置監視器狀態
assert(mark->is_neutral(), "invariant");
ObjectMonitor * m = omAlloc(Self);
// prepare m for installation - set monitor to initial state
m->Recycle();
m->set_header(mark);
m->set_owner(NULL);
m->set_object(object);
m->_recursions = 0;
m->_Responsible = NULL;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit; // consider: keep metastats by type/class
if (object->cas_set_mark(markOopDesc::encode(m), mark) != mark) {
m->set_object(NULL);
m->set_owner(NULL);
m->Recycle();
omRelease(Self, m, true);
m = NULL;
continue;
}
//省略部分代碼...
return m ;
}
複製代碼
在輕量級鎖膨脹爲重量級鎖大體能夠分爲如下幾個過程
在瞭解了偏向鎖、輕量級鎖,與重量級鎖的原理後,如今咱們來總結一下整個鎖升級的流程。具體以下圖所示:
在上文中,咱們主要介紹了整個鎖升級的流程與源代碼實現。而真正線程的等待與競爭咱們尚未詳細描述。下面咱們就來說講當鎖膨脹爲重量級鎖的時候,整個線程的競爭與等待過程。重量級鎖的競爭是在objectMonitor.cpp中ObjectMonitor::enter()方法中實現的。
在講解具體的鎖獲取以前,咱們須要瞭解每一個鎖對象(這裏指已經升級爲重量級鎖的對象)都有一個ObjectMonitor(對象監視器)。也就是說每一個線程獲取鎖對象都會經過ObjectMonitor。代碼以下所示:(這裏我省略了一些沒必要要的屬性。你們只須要看一些關鍵的結構)
class ObjectMonitor {
public:
enum {
OM_OK, // 沒有錯誤
OM_SYSTEM_ERROR, // 系統錯誤
OM_ILLEGAL_MONITOR_STATE, // 監視器狀態異常
OM_INTERRUPTED, // 當前線程已經中斷
OM_TIMED_OUT // 線程等待超時
};
volatile markOop _header; // 線程幀棧中存儲的 鎖對象的mark word拷貝
protected: // protected for JvmtiRawMonitor
void * volatile _owner; // 指向得到objectMonitor的線程或者 BasicLock對象
volatile jlong _previous_owner_tid; // 上一個得到objectMonitor的線程id
volatile intptr_t _recursions; // 同一線程重入鎖的次數,若是是0,表示第一次進入
ObjectWaiter * volatile _EntryList; // 在進入或者重進入阻塞狀態下的線程鏈表
protected:
ObjectWaiter * volatile _WaitSet; // 處於等待狀態下的線程鏈表
volatile jint _waiters; //處於等待狀態下的線程個數
複製代碼
在瞭解了ObjectMonitor 類中具體結構後,來看看具體的鎖獲取方法ObjectMonitor::enter(),具體代碼以下所示:
void ObjectMonitor::enter(TRAPS) {
Thread * const Self = THREAD;//當前進入enter方法的線程
//經過CAS操做嘗試吧monitor的_owner( 指向得到objectMonitor的線程或者 BasicLock對象)設置爲當前線程
void * cur = Atomic::cmpxchg(Self, &_owner, (void*)NULL);
if (cur == NULL) {//若是成功,當前線程獲取鎖成功,直接執行同步代碼塊
assert(_recursions == 0, "invariant");
assert(_owner == Self, "invariant");
return;
}
//若是是同一線程,則記錄當前重入的次數(上一步CAS操做無論成功仍是失敗,都會返回_owner指向的地址)
if (cur == Self) {
_recursions++;
return;
}
//若是以前_owner指向的BasicLock在當前線程棧上,說明當前線程是第一次進入該monitor,
//設置_recursions爲1,_owner爲當前線程,該線程成功得到鎖並返回;
if (Self->is_lock_owned ((address)cur)) {
assert(_recursions == 0, "internal state error");
_recursions = 1;
_owner = Self;
return;
}
//省略部
分代碼...
//開始競爭鎖
for (;;) {
jt->set_suspend_equivalent();
EnterI(THREAD);
if (!ExitSuspendEquivalent(jt)) break;
_recursions = 0;
_succ = NULL;
exit(false, Self);
jt->java_suspend_self();
}
Self->set_current_pending_monitor(NULL);
}
//省略部分代碼...
}
複製代碼
在重量級級鎖的競爭步驟,主要分爲如下幾個步驟:
void ObjectMonitor::EnterI(TRAPS) {
Thread * const Self = THREAD;
//省略部分代碼...
//把當前線程被封裝成ObjectWaiter的node對象,狀態設置成ObjectWaiter::TS_CXQ;
ObjectWaiter node(Self);
Self->_ParkEvent->reset();
node._prev = (ObjectWaiter *) 0xBAD;
node.TState = ObjectWaiter::TS_CXQ;//TS_CXQ:爲競爭鎖狀態
//在for循環中,經過CAS把node節點push到_cxq鏈表中;
ObjectWaiter * nxt;
for (;;) {
node._next = nxt = _cxq;
//若是CAS操做失敗,繼續嘗試,是由於當期_cxq鏈表已經發生改變了
if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;
//有可能在放入_cxq鏈表中時,已經獲取到鎖了,直接返回
if (TryLock (Self) > 0) {
assert(_succ != Self, "invariant");
assert(_owner == Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
}
//將node節點push到_cxq鏈表以後,經過自旋嘗試獲取鎖
for (;;) {
if (TryLock(Self) > 0) break;//嘗試獲取鎖
assert(_owner != Self, "invariant");
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::replace_if_null(Self, &_Responsible);
}
//判斷執行循環的次數,若是執行相應循環後,若是仍是沒有獲取到鎖,則經過park函數將當前線程掛起,等待被喚醒
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT(Inflated enter - park TIMED);
Self->_ParkEvent->park((jlong) recheckInterval);
// Increase the recheckInterval, but clamp the value.
recheckInterval *= 8;
if (recheckInterval > MAX_RECHECK_INTERVAL) { 其中MAX_RECHECK_INTERVAL爲1000
recheckInterval = MAX_RECHECK_INTERVAL;
}
} else {
TEVENT(Inflated enter - park UNTIMED);
Self->_ParkEvent->park();
}
//省略部分代碼...
OrderAccess::fence();
}
//省略部分代碼...
return;
}
複製代碼
關於EnterI()方法,能夠分爲如下步驟:
關於獲取鎖的TryLock方法以下所示:
int ObjectMonitor::TryLock(Thread * Self) {
void * own = _owner;
if (own != NULL) return 0;
if (Atomic::replace_if_null(Self, &_owner)) {
return 1;
}
return -1;
}
複製代碼
該函數其實很簡單,就是將鎖中的_owner指針指向當前線程,若是成功返回1,反之返回-1。
void ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * const Self = THREAD;
if (THREAD != _owner) {//若是當前鎖對象中的_owner沒有指向當前線程
if (THREAD->is_lock_owned((address) _owner)) {
//可是_owner指向的BasicLock在當前線程棧上,那麼將_owner指向當前線程
assert(_recursions == 0, "invariant");
_owner = THREAD;
_recursions = 0;
} else {
//省略部分代碼...
return;
}
}
//若是當前,線程重入鎖的次數,不爲0,那麼就從新走ObjectMonitor::exit,直到重入鎖次數爲0爲止
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
TEVENT(Inflated exit - recursive);
return;
}
//省略部分代碼...
for (;;) {
if (Knob_ExitPolicy == 0) {
OrderAccess::release_store(&_owner, (void*)NULL); //釋放鎖
OrderAccess::storeload(); // See if we need to wake a successor
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
TEVENT(Inflated exit - simple egress);
return;
}
TEVENT(Inflated exit - complex egress);
//省略部分代碼...
}
//省略部分代碼...
ObjectWaiter * w = NULL;
int QMode = Knob_QMode;
//根據QMode的模式判斷,
//若是QMode == 2則直接從_cxq掛起的線程中喚醒
if (QMode == 2 && _cxq != NULL) {
w = _cxq;
ExitEpilog(Self, w);
return;
}
//省略部分代碼... 省略的代碼爲根據QMode的不一樣,不一樣的喚醒機制
}
}
//省略部分代碼...
}
複製代碼
重量級鎖的釋放能夠分爲如下步驟:
寫了這麼久,終於寫完了~~~ 掌聲在哪裏?
該篇文章主要是根據先關博客與本身對源碼的理解,發現其實有不少東西本身仍是描述的不是很清楚。主要緣由是C++代碼看的我頭大。我的感受Java的整個鎖的機制其實涉及到蠻多的東西,本身理解的只是冰山一角,若是你們對代碼或者文章不理解,請輕噴。我也是看的半懂半懂的。原諒我啦~~~
站在巨人的肩膀上能看的更遠~~~
《深刻理解Java虛擬機:JVM高級特性與最佳實踐》
《Java併發編程的藝術》