從jvm源碼看synchronized

synchronized的使用

synchronized關鍵字是Java中解決併發問題的一種經常使用方法,也是最簡單的一種方法,其做用有三個:(1)互斥性:確保線程互斥的訪問同步代碼(2)可見性:保證共享變量的修改可以及時可見(3)有序性:有效解決重排序問題,其用法也有三個:html

  1. 修飾實例方法
  2. 修飾靜態方法
  3. 修飾代碼塊

修飾實例方法

public class Thread1 implements Runnable{
    //共享資源(臨界資源)
    static int i=0;

    //若是沒有synchronized關鍵字,輸出小於20000
    public synchronized void increase(){
        i++;
    }
    public void run() {
        for(int j=0;j<10000;j++){
            increase();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread1 t=new Thread1();
        Thread t1=new Thread(t);
        Thread t2=new Thread(t);
        t1.start();
        t2.start();
        t1.join();//主線程等待t1執行完畢
        t2.join();//主線程等待t2執行完畢
        System.out.println(i);
    }
    /**
     * 輸出結果:
     * 20000
     */
}

修飾靜態方法

public class Thread1 {
    //共享資源(臨界資源)
    static int i = 0;

    //若是沒有synchronized關鍵字,輸出小於20000
    public static synchronized void increase() {
        i++;
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new Runnable() {
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    increase();
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    increase();
                }
            }
        });
        t1.start();
        t2.start();
        t1.join();//主線程等待t1執行完畢
        t2.join();//主線程等待t2執行完畢
        System.out.println(i);
    }
    /**
     * 輸出結果:
     * 20000
     */
}

修飾代碼塊

public class Thread1 implements Runnable{
    //共享資源(臨界資源)
    static int i=0;


    @Override
    public void run() {
        for(int j=0;j<10000;j++){
            //得到了String的類鎖
            synchronized (String.class){
            i++;}
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread1 t=new Thread1();
        Thread t1=new Thread(t);
        Thread t2=new Thread(t);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
    /**
     * 輸出結果:
     * 20000
     */
}

總結

  1. synchronized修飾的實例方法,多線程併發訪問時,只能有一個線程進入,得到對象內置鎖,其餘線程阻塞等待,但在此期間線程仍然能夠訪問其餘方法。
  2. synchronized修飾的靜態方法,多線程併發訪問時,只能有一個線程進入,得到類鎖,其餘線程阻塞等待,但在此期間線程仍然能夠訪問其餘方法。
  3. synchronized修飾的代碼塊,多線程併發訪問時,只能有一個線程進入,根據括號中的對象或者是類,得到相應的對象內置鎖或者是類鎖
  4. 每一個類都有一個類鎖,類的每一個對象也有一個內置鎖,它們是互不干擾的,也就是說一個線程能夠同時得到類鎖和該類實例化對象的內置鎖,當線程訪問非synchronzied修飾的方法時,並不須要得到鎖,所以不會產生阻塞。

Synchronzied的底層原理

對象頭和內置鎖(ObjectMonitor)

根據jvm的分區,對象分配在堆內存中,能夠用下圖表示:
java

對象頭

Hotspot虛擬機的對象頭包括兩部分信息,第一部分用於儲存對象自身的運行時數據,如哈希碼,GC分代年齡,鎖狀態標誌,鎖指針等,這部分數據在32bit和64bit的虛擬機中大小分別爲32bit和64bit,官方稱它爲"Mark word",考慮到虛擬機的空間效率,Mark Word被設計成一個非固定的數據結構以便在極小的空間中存儲儘可能多的信息,它會根據對象的狀態複用本身的存儲空間,詳細狀況以下圖:
node

對象頭的另一部分是類型指針,即對象指向它的類元數據的指針,若是對象訪問定位方式是句柄訪問,那麼該部分沒有,若是是直接訪問,該部分保留。句柄訪問方式以下圖:
安全

直接訪問以下圖:
數據結構

內置鎖(ObjectMonitor)

一般所說的對象的內置鎖,是對象頭Mark Word中的重量級鎖指針指向的monitor對象,該對象是在HotSpot底層C++語言編寫的(openjdk裏面看),簡單看一下代碼:多線程

//結構體以下
ObjectMonitor::ObjectMonitor() {  
  _header       = NULL;  
  _count       = 0;  
  _waiters      = 0,  
  _recursions   = 0;       //線程的重入次數
  _object       = NULL;  
  _owner        = NULL;    //標識擁有該monitor的線程
  _WaitSet      = NULL;    //等待線程組成的雙向循環鏈表,_WaitSet是第一個節點
  _WaitSetLock  = 0 ;  
  _Responsible  = NULL ;  
  _succ         = NULL ;  
  _cxq          = NULL ;    //多線程競爭鎖進入時的單向鏈表
  FreeNext      = NULL ;  
  _EntryList    = NULL ;    //_owner從該雙向循環鏈表中喚醒線程結點,_EntryList是第一個節點
  _SpinFreq     = 0 ;  
  _SpinClock    = 0 ;  
  OwnerIsThread = 0 ;  
}

ObjectMonitor隊列之間的關係轉換能夠用下圖表示:
併發

既然提到了_waitSet和_EntryList(_cxq隊列後面會說),那就看一下底層的wait和notify方法
wait方法的實現過程:app

//1.調用ObjectSynchronizer::wait方法
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
  /*省略 */
  //2.得到Object的monitor對象(即內置鎖)
  ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
  DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
  //3.調用monitor的wait方法
  monitor->wait(millis, true, THREAD);
  /*省略*/
}
  //4.在wait方法中調用addWaiter方法
  inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  /*省略*/
  if (_WaitSet == NULL) {
    //_WaitSet爲null,就初始化_waitSet
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    //不然就尾插
    ObjectWaiter* head = _WaitSet ;
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}
  //5.而後在ObjectMonitor::exit釋放鎖,接着 thread_ParkEvent->park  也就是wait

總結:經過object得到內置鎖(objectMonitor),經過內置鎖將Thread封裝成OjectWaiter對象,而後addWaiter將它插入以_waitSet爲首結點的等待線程鏈表中去,最後釋放鎖。jvm

notify方法的底層實現ide

//1.調用ObjectSynchronizer::notify方法
    void ObjectSynchronizer::notify(Handle obj, TRAPS) {
    /*省略*/
    //2.調用ObjectSynchronizer::inflate方法
    ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
}
    //3.經過inflate方法獲得ObjectMonitor對象
    ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
    /*省略*/
     if (mark->has_monitor()) {
          ObjectMonitor * inf = mark->monitor() ;
          assert (inf->header()->is_neutral(), "invariant");
          assert (inf->object() == object, "invariant") ;
          assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is inva;lid");
          return inf 
      }
    /*省略*/ 
      }
    //4.調用ObjectMonitor的notify方法
    void ObjectMonitor::notify(TRAPS) {
    /*省略*/
    //5.調用DequeueWaiter方法移出_waiterSet第一個結點
    ObjectWaiter * iterator = DequeueWaiter() ;
    //6.後面省略是將上面DequeueWaiter尾插入_EntrySet的操做
    /**省略*/
  }

總結:經過object得到內置鎖(objectMonitor),調用內置鎖的notify方法,經過_waitset結點移出等待鏈表中的首結點,將它置於_EntrySet中去,等待獲取鎖。注意:notifyAll根據policy不一樣可能移入_EntryList或者_cxq隊列中,此處不詳談。

在瞭解對象頭和ObjectMonitor後,接下來咱們結合分析synchronzied的底層實現。

synchronzied的底層原理

synchronized修飾代碼塊

經過下列簡介的代碼來分析:

public class test{
    public void testSyn(){
        synchronized(this){
        }
    }
}

javac編譯,javap -verbose反編譯,結果以下:

/**
 * ...
 **/
  public void testSyn();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=2, locals=3, args_size=1
         0: aload_0              
         1: dup                 
         2: astore_1            
         3: monitorenter        //申請得到對象的內置鎖
         4: aload_1             
         5: monitorexit         //釋放對象內置鎖
         6: goto          14
         9: astore_2
        10: aload_1
        11: monitorexit         //釋放對象內置鎖
        12: aload_2
        13: athrow
        14: return

此處咱們只討論了重量級鎖(ObjectMonitor)的獲取狀況,其餘鎖的獲取放在後面synchronzied的優化中進行說明。源碼以下:

void ATTR ObjectMonitor::enter(TRAPS) {
  Thread * const Self = THREAD ;
  void * cur ;
  //經過CAS操做嘗試把monitor的_owner字段設置爲當前線程
  cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
  //獲取鎖失敗
  if (cur == NULL) {
     assert (_recursions == 0   , "invariant") ;
     assert (_owner      == Self, "invariant") ;
     return ;
  }

//若是以前的_owner指向該THREAD,那麼該線程是重入,_recursions++
  if (cur == Self) {
     _recursions ++ ;
     return ;
  }
//若是當前線程是第一次進入該monitor,設置_recursions爲1,_owner爲當前線程
  if (Self->is_lock_owned ((address)cur)) {
    assert (_recursions == 0, "internal state error");
    _recursions = 1 ;   //_recursions標記爲1
    _owner = Self ;     //設置owner
    OwnerIsThread = 1 ;
    return ;
  }
  /**
  *此處省略鎖的自旋優化等操做,統一放在後面synchronzied優化中說
  **/

總結:

  1. 若是monitor的進入數爲0,則該線程進入monitor,而後將進入數設置爲1,該線程即爲monitor的owner
  2. 若是線程已經佔有該monitor,只是從新進入,則進入monitor的進入數加1.
  3. 若是其餘線程已經佔用了monitor,則該線程進入阻塞狀態,直到monitor的進入數爲0,再從新嘗試獲取monitor的全部權

synchronized修飾方法

仍是從簡潔的代碼來分析:

public class test{
    public synchronized  void testSyn(){
    }
}

javac編譯,javap -verbose反編譯,結果以下:

/**
 * ...
 **/
  public synchronized void testSyn();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_SYNCHRONIZED
    Code:
      stack=0, locals=1, args_size=1
         0: return
      LineNumberTable:
        line 3: 0

結果和synchronized修飾代碼塊的狀況不一樣,仔細比較會發現多了ACC_SYNCHRONIZED這個標識,test.java經過javac編譯造成的test.class文件,在該文件中包含了testSyn方法的方法表,其中ACC_SYNCHRONIZED標誌位是1,當線程執行方法的時候會檢查該標誌位,若是爲1,就自動的在該方法先後添加monitorenter和monitorexit指令,能夠稱爲monitor指令的隱式調用。

上面所介紹的經過synchronzied實現同步用到了對象的內置鎖(ObjectMonitor),而在ObjectMonitor的函數調用中會涉及到Mutex lock等特權指令,那麼這個時候就存在操做系統用戶態和核心態的轉換,這種切換會消耗大量的系統資源,由於用戶態與內核態都有各自專用的內存空間,專用的寄存器等,用戶態切換至內核態須要傳遞給許多變量、參數給內核,內核也須要保護好用戶態在切換時的一些寄存器值、變量等,這也是爲何早期的synchronized效率低的緣由。在jdk1.6以後,從jvm層面作了很大的優化,下面主要介紹作了哪些優化。

synchronized的優化

在瞭解了synchronized重量級鎖效率特別低以後,jdk天然作了一些優化,出現了偏向鎖,輕量級鎖,重量級鎖,自旋等優化,咱們應該改正monitorenter指令就是獲取對象重量級鎖的錯誤認識,很顯然,優化以後,鎖的獲取判斷次序是偏向鎖->輕量級鎖->重量級鎖。

偏向鎖

源碼以下:

//偏向鎖入口
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
 //UseBiasedLocking判斷是否開啓偏向鎖
 if (UseBiasedLocking) {
    if (!SafepointSynchronize::is_at_safepoint()) {
      //獲取偏向鎖的函數調用
      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
        return;
      }
    } else {
      assert(!attempt_rebias, "can not rebias toward VM thread");
      BiasedLocking::revoke_at_safepoint(obj);
    }
 }
 //不能偏向,就獲取輕量級鎖
 slow_enter (obj, lock, THREAD) ;
}

BiasedLocking::revoke_and_rebias調用過程以下流程圖:

偏向鎖的撤銷過程以下:

輕量級鎖

輕量級鎖獲取源碼:

//輕量級鎖入口
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
  markOop mark = obj->mark();  //得到Mark Word
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");
  //是否無鎖不可偏向,標誌001
  if (mark->is_neutral()) {
    //圖A步驟1
    lock->set_displaced_header(mark);
    //圖A步驟2
    if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
      TEVENT (slow_enter: release stacklock) ;
      return ;
    }
    // Fall through to inflate() ...
  } else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { //若是Mark Word指向本地棧幀,線程重入
    assert(lock != mark->locker(), "must not re-lock the same lock");
    assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
    lock->set_displaced_header(NULL);//header設置爲null
    return;
  }
  lock->set_displace

  d_header(markOopDesc::unused_mark());
  //輕量級鎖膨脹,膨脹完成以後嘗試獲取重量級鎖
  ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}

輕量級鎖獲取流程以下:

輕量級鎖撤銷源碼:

void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
  assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
  markOop dhw = lock->displaced_header();
  markOop mark ;
  if (dhw == NULL) {//若是header爲null,說明這是線程重入的棧幀,直接返回,不用回寫
     mark = object->mark() ;
     assert (!mark->is_neutral(), "invariant") ;
     if (mark->has_locker() && mark != markOopDesc::INFLATING()) {
        assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ;
     }
     if (mark->has_monitor()) {
        ObjectMonitor * m = mark->monitor() ;
     }
     return ;
  }

  mark = object->mark() ;
  if (mark == (markOop) lock) {
     assert (dhw->is_neutral(), "invariant") ;
     //CAS將Mark Word內容寫回
     if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
        TEVENT (fast_exit: release stacklock) ;
        return;
     }
  }
  //CAS操做失敗,輕量級鎖膨脹,爲何在撤銷鎖的時候會有失敗的可能?
   ObjectSynchronizer::inflate(THREAD, object)->exit (THREAD) ;
}

輕量級鎖撤銷流程以下:

輕量級鎖膨脹

源代碼:

ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
  assert (Universe::verify_in_progress() ||
          !SafepointSynchronize::is_at_safepoint(), "invariant") ;
  for (;;) { // 爲後面的continue操做提供自旋
      const markOop mark = object->mark() ; //得到Mark Word結構
      assert (!mark->has_bias_pattern(), "invariant") ;

      //Mark Word可能有如下幾種狀態:
      // *  Inflated(膨脹完成)     - just return
      // *  Stack-locked(輕量級鎖) - coerce it to inflated
      // *  INFLATING(膨脹中)     - busy wait for conversion to complete
      // *  Neutral(無鎖)        - aggressively inflate the object.
      // *  BIASED(偏向鎖)       - Illegal.  We should never see this

      if (mark->has_monitor()) {//判斷是不是重量級鎖
          ObjectMonitor * inf = mark->monitor() ;
          assert (inf->header()->is_neutral(), "invariant");
          assert (inf->object() == object, "invariant") ;
          assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
          //Mark->has_monitor()爲true,說明已是重量級鎖了,膨脹過程已經完成,返回
          return inf ;
      }
      if (mark == markOopDesc::INFLATING()) { //判斷是否在膨脹
         TEVENT (Inflate: spin while INFLATING) ;
         ReadStableMark(object) ;
         continue ; //若是正在膨脹,自旋等待膨脹完成
      }

      if (mark->has_locker()) { //若是當前是輕量級鎖
          ObjectMonitor * m = omAlloc (Self) ;//返回一個對象的內置ObjectMonitor對象
          m->Recycle();
          m->_Responsible  = NULL ;
          m->OwnerIsThread = 0 ;
          m->_recursions   = 0 ;
          m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;//設置自旋獲取重量級鎖的次數
          //CAS操做標識Mark Word正在膨脹
          markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
          if (cmp != mark) {
             omRelease (Self, m, true) ;
             continue ;   //若是上述CAS操做失敗,自旋等待膨脹完成
          }
          m->set_header(dmw) ;
          m->set_owner(mark->locker());//設置ObjectMonitor的_owner爲擁有對象輕量級鎖的線程,而不是當前正在inflate的線程
          m->set_object(object);
          /**
          *省略了部分代碼
          **/
          return m ;
      }
  }
}

輕量級鎖膨脹流程圖:

如今來回答下以前提出的問題:爲何在撤銷輕量級鎖的時候會有失敗的可能?
假設thread1擁有了輕量級鎖,Mark Word指向thread1棧幀,thread2請求鎖的時候,就會膨脹初始化ObjectMonitor對象,將Mark Word更新爲指向ObjectMonitor的指針,那麼在thread1退出的時候,CAS操做會失敗,由於Mark Word再也不指向thread1的棧幀,這個時候thread1自旋等待infalte完畢,執行重量級鎖的退出操做

重量級鎖

重量級鎖的獲取入口:

void ATTR ObjectMonitor::enter(TRAPS) {
  Thread * const Self = THREAD ;
  void * cur ;
  cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
  if (cur == NULL) {
     assert (_recursions == 0   , "invariant") ;
     assert (_owner      == Self, "invariant") ;
     return ;
  }

  if (cur == Self) {
     _recursions ++ ;
     return ;
  }

  if (Self->is_lock_owned ((address)cur)) {
    assert (_recursions == 0, "internal state error");
    _recursions = 1 ;
    // Commute owner from a thread-specific on-stack BasicLockObject address to
    // a full-fledged "Thread *".
    _owner = Self ;
    OwnerIsThread = 1 ;
    return ;
  }
  /**
  *上述部分在前面已經分析過,再也不累述
  **/

  Self->_Stalled = intptr_t(this) ;
  //TrySpin是一個自旋獲取鎖的操做,此處就不列出源碼了
  if (Knob_SpinEarly && TrySpin (Self) > 0) {
     Self->_Stalled = 0 ;
     return ;
  }
  /*
  *省略部分代碼
  */
    for (;;) {
      EnterI (THREAD) ;
      /**
      *省略了部分代碼
      **/
  }
}

進入EnterI (TRAPS)方法(這段代碼我的以爲頗有意思):

void ATTR ObjectMonitor::EnterI (TRAPS) {
    Thread * Self = THREAD ;
    if (TryLock (Self) > 0) {
        //這下不自旋了,我就默默的TryLock一下
        return ;
    }

    DeferredInitialize () ;
    //此處又有自旋獲取鎖的操做
    if (TrySpin (Self) > 0) {
        return ;
    }
    /**
    *到此,自旋終於全失敗了,要入隊掛起了
    **/
    ObjectWaiter node(Self) ; //將Thread封裝成ObjectWaiter結點
    Self->_ParkEvent->reset() ;
    node._prev   = (ObjectWaiter *) 0xBAD ; 
    node.TState  = ObjectWaiter::TS_CXQ ; 
    ObjectWaiter * nxt ;
    for (;;) { //循環,保證將node插入隊列
        node._next = nxt = _cxq ;//將node插入到_cxq隊列的首部
        //CAS修改_cxq指向node
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
        if (TryLock (Self) > 0) {//我再默默的TryLock一下,真的是不想掛起呀!
            return ;
        }
    }
    if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
        // Try to assume the role of responsible thread for the monitor.
        // CONSIDER:  ST vs CAS vs { if (Responsible==null) Responsible=Self }
        Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
    }
    TEVENT (Inflated enter - Contention) ;
    int nWakeups = 0 ;
    int RecheckInterval = 1 ;

    for (;;) {
        if (TryLock (Self) > 0) break ;//臨死以前,我再TryLock下

        if ((SyncFlags & 2) && _Responsible == NULL) {
           Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
        }
        if (_Responsible == Self || (SyncFlags & 1)) {
            TEVENT (Inflated enter - park TIMED) ;
            Self->_ParkEvent->park ((jlong) RecheckInterval) ;
            RecheckInterval *= 8 ;
            if (RecheckInterval > 1000) RecheckInterval = 1000 ;
        } else {
            TEVENT (Inflated enter - park UNTIMED) ;
            Self->_ParkEvent->park() ; //終於掛起了
        }

        if (TryLock(Self) > 0) break ;
        /**
        *後面代碼省略
        **/
}

try了那麼屢次lock,接下來看下TryLock:

int ObjectMonitor::TryLock (Thread * Self) {
   for (;;) {
      void * own = _owner ;
      if (own != NULL) return 0 ;//若是有線程還擁有着重量級鎖,退出
      //CAS操做將_owner修改成當前線程,操做成功return>0
      if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
         return 1 ;
      }
      //CAS更新失敗return<0
      if (true) return -1 ;
   }
}

重量級鎖獲取入口流程圖:

重量級鎖的出口:

void ATTR ObjectMonitor::exit(TRAPS) {
   Thread * Self = THREAD ;
   if (THREAD != _owner) {
     if (THREAD->is_lock_owned((address) _owner)) {
       _owner = THREAD ;
       _recursions = 0 ;
       OwnerIsThread = 1 ;
     } else {
       TEVENT (Exit - Throw IMSX) ;
       if (false) {
          THROW(vmSymbols::java_lang_IllegalMonitorStateException());
       }
       return;
     }
   }
   if (_recursions != 0) {
     _recursions--;        // 若是_recursions次數不爲0.自減
     TEVENT (Inflated exit - recursive) ;
     return ;
   }
   if ((SyncFlags & 4) == 0) {
      _Responsible = NULL ;
   }

   for (;;) {
      if (Knob_ExitPolicy == 0) {
         OrderAccess::release_store_ptr (&_owner, NULL) ;   // drop the lock
         OrderAccess::storeload() ;                         
         if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
            TEVENT (Inflated exit - simple egress) ;
            return ;
         }
         TEVENT (Inflated exit - complex egress) ;
         if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
            return ;
         }
         TEVENT (Exit - Reacquired) ;
      } else {
         if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
            OrderAccess::release_store_ptr (&_owner, NULL) ;  
            OrderAccess::storeload() ;
            if (_cxq == NULL || _succ != NULL) {
                TEVENT (Inflated exit - simple egress) ;
                return ;
            }
            if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
               TEVENT (Inflated exit - reacquired succeeded) ;
               return ;
            }
            TEVENT (Inflated exit - reacquired failed) ;
         } else {
            TEVENT (Inflated exit - complex egress) ;
         }
      }
      ObjectWaiter * w = NULL ;
      int QMode = Knob_QMode ;
      if (QMode == 2 && _cxq != NULL) {
          /**
          *模式2:cxq隊列的優先權大於EntryList,直接從cxq隊列中取出一個線程結點,準備喚醒
          **/
          w = _cxq ;
          ExitEpilog (Self, w) ;
          return ;
      }

      if (QMode == 3 && _cxq != NULL) {
          /**
          *模式3:將cxq隊列插入到_EntryList尾部
          **/
          w = _cxq ;
          for (;;) {
             //CAS操做取出cxq隊列首結點
             ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
             if (u == w) break ;
             w = u ; //更新w,自旋
          }
          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) {
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ; //改變ObjectWaiter狀態
              //下面兩句爲cxq隊列反向構造一條鏈,即將cxq變成雙向鏈表
              p->_prev = q ;
              q = p ;
          }
          ObjectWaiter * Tail ;
          //得到_EntryList尾結點
          for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
          if (Tail == NULL) {
              _EntryList = w ;//_EntryList爲空,_EntryList=w
          } else {
              //將w插入_EntryList隊列尾部
              Tail->_next = w ;
              w->_prev = Tail ;
          }
   }

      if (QMode == 4 && _cxq != NULL) {
         /**
         *模式四:將cxq隊列插入到_EntryList頭部
         **/
          w = _cxq ;
          for (;;) {
             ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
             if (u == w) break ;
             w = u ;
          }
          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) {
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ;
              p->_prev = q ;
              q = p ;
          }
          if (_EntryList != NULL) {
            //q爲cxq隊列最後一個結點
              q->_next = _EntryList ;
              _EntryList->_prev = q ;
          }
          _EntryList = w ;
       }

      w = _EntryList  ;
      if (w != NULL) {
          ExitEpilog (Self, w) ;//從_EntryList中喚醒線程
          return ;
      }
      w = _cxq ;
      if (w == NULL) continue ; //若是_cxq和_EntryList隊列都爲空,自旋

      for (;;) {
          //自旋再得到cxq首結點
          ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
          if (u == w) break ;
          w = u ;
      }
      /**
      *下面執行的是:cxq不爲空,_EntryList爲空的狀況
      **/
      if (QMode == 1) {//結合前面的代碼,若是QMode == 1,_EntryList不爲空,直接從_EntryList中喚醒線程
         // QMode == 1 : drain cxq to EntryList, reversing order
         // We also reverse the order of the list.
         ObjectWaiter * s = NULL ;
         ObjectWaiter * t = w ;
         ObjectWaiter * u = NULL ;
         while (t != NULL) {
             guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
             t->TState = ObjectWaiter::TS_ENTER ;
             //下面的操做是雙向鏈表的倒置
             u = t->_next ;
             t->_prev = u ;
             t->_next = s ;
             s = t;
             t = u ;
         }
         _EntryList  = s ;//_EntryList爲倒置後的cxq隊列
      } else {
         // QMode == 0 or QMode == 2
         _EntryList = w ;
         ObjectWaiter * q = NULL ;
         ObjectWaiter * p ;
         for (p = w ; p != NULL ; p = p->_next) {
             guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
             p->TState = ObjectWaiter::TS_ENTER ;
             //構形成雙向的
             p->_prev = q ;
             q = p ;
         }
      }
      if (_succ != NULL) continue;
      w = _EntryList  ;
      if (w != NULL) {
          ExitEpilog (Self, w) ; //從_EntryList中喚醒線程
          return ;
      }
   }
}

ExitEpilog用來喚醒線程,代碼以下:

void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
   assert (_owner == Self, "invariant") ;
   _succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
   ParkEvent * Trigger = Wakee->_event ;
   Wakee  = NULL ;
   OrderAccess::release_store_ptr (&_owner, NULL) ;
   OrderAccess::fence() ;                            
   if (SafepointSynchronize::do_call_back()) {
      TEVENT (unpark before SAFEPOINT) ;
   }
   DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
   Trigger->unpark() ; //喚醒線程
   // Maintain stats and report events to JVMTI
   if (ObjectMonitor::_sync_Parks != NULL) {
      ObjectMonitor::_sync_Parks->inc() ;
   }
}

重量級鎖出口流程圖:

自旋

經過對源碼的分析,發現多處存在自旋和tryLock操做,那麼這些操做好很差,若是tryLock過少,大部分線程都會掛起,由於在擁有對象鎖的線程釋放鎖後不能及時感知,致使用戶態和核心態狀態轉換較多,效率低下,極限思惟就是:沒有自旋,全部線程掛起,若是tryLock過多,存在兩個問題:1. 即便自旋避免了掛起,可是自旋的代價超過了掛起,得不償失,那我還不如不要自旋了。 2. 若是自旋仍然不能避免大部分掛起的話,那就是又自旋又掛起,效率過低。極限思惟就是:無限自旋,白白浪費了cpu資源,因此在代碼中每一個自旋和tryLock的插入應該都是通過測試後決定的。

編譯期間鎖優化

鎖消除

仍是先看一下簡潔的代碼

public class test {
    public String test(String s1,String s2) {
        return s1+s2;
 }
}

javac javap後:

public class test {
  public test();
    Code:
       0: aload_0
       1: invokespecial #1                  // Method java/lang/Object."<init>":()V
       4: return

  public java.lang.String test(java.lang.String, java.lang.String);
    Code:
       0: new           #2                  // class java/lang/StringBuilder
       3: dup
       4: invokespecial #3                  // Method java/lang/StringBuilder."<init>":()V
       7: aload_1
       8: invokevirtual #4                  // Method java/lang/StringBuilder.append:(Ljava/lang/String;)Ljava/lang/StringBuilder;
      11: aload_2
      12: invokevirtual #4                  // Method java/lang/StringBuilder.append:(Ljava/lang/String;)Ljava/lang/StringBuilder;
      15: invokevirtual #5                  // Method java/lang/StringBuilder.toString:()Ljava/lang/String;
      18: areturn
}

上述字節碼等價成java代碼爲:

public class test {
    public String test(String s1,String s2) {
        StringBuilder sb = new StringBuilder();
        sb.append(s1);
        sb.append(s2);
        return sb.toString();
 }
}

sb的append方法是同步的,可是sb是在方法內部,每一個運行的線程都會實例化一個StringBuilder對象,在私有棧持有該對象引用(其餘線程沒法獲得),也就是說sb不存在多線程訪問,那麼在jvm運行期間,即時編譯器就會將鎖消除

鎖粗化

將前面的代碼稍微變一下:

public class test {
    StringBuilder sb = new StringBuilder();
    public String test(String s1,String s2) {
        sb.append(s1);
        sb.append(s2);
        return sb.toString();
 }
}

首先能夠肯定的是這段代碼不能鎖消除優化,由於sb是類的實例變量,會被多線程訪問,存在線程安全問題,那麼訪問test方法的時候就會對sb對象,加鎖,解鎖,加鎖,解鎖,很顯然這一過程將會大大下降效率,所以在即時編譯的時候會進行鎖粗化,在sb.appends(s1)以前加鎖,在sb.append(s2)執行完後釋放鎖。

總結

引入偏向鎖的目的:在只有單線程執行狀況下,儘可能減小沒必要要的輕量級鎖執行路徑,輕量級鎖的獲取及釋放依賴屢次CAS原子指令,而偏向鎖只依賴一次CAS原子指令置換ThreadID,以後只要判斷線程ID爲當前線程便可,偏向鎖使用了一種等到競爭出現才釋放鎖的機制,消除偏向鎖的開銷仍是蠻大的。若是同步資源或代碼一直都是多線程訪問的,那麼消除偏向鎖這一步驟對你來講就是多餘的,能夠經過-XX:-UseBiasedLocking=false來關閉
引入輕量級鎖的目的:在多線程交替執行同步塊的狀況下,儘可能避免重量級鎖引發的性能消耗(用戶態和核心態轉換),可是若是多個線程在同一時刻進入臨界區,會致使輕量級鎖膨脹升級重量級鎖,因此輕量級鎖的出現並不是是要替代重量級鎖
重入:對於不一樣級別的鎖都有重入策略,偏向鎖:單線程獨佔,重入只用檢查threadId等於該線程;輕量級鎖:重入將棧幀中lock record的header設置爲null,重入退出,只用彈出棧幀,直到最後一個重入退出CAS寫回數據釋放鎖;重量級鎖:重入_recursions++,重入退出_recursions--,_recursions=0時釋放鎖
最後放一張摘自網上的一張大圖(保存本地,方便食用):

ps:源碼解讀與流程圖都是原創,可能會有貽誤,歡迎指正,過程不易,若是以爲幫到了你,就點個推薦吧!

參考資料

http://blog.csdn.net/javazejian/article/details/72828483 http://www.cnblogs.com/paddix/p/5405678.html http://www.cnblogs.com/paddix/p/5367116.html https://www.jianshu.com/p/c5058b6fe8e5 http://blog.csdn.net/zqz_zqz/article/details/70233767 http://blog.csdn.net/u012465296/article/details/53022317 https://www.zhihu.com/question/39009953?sort=created http://blog.sina.com.cn/s/blog_c038e9930102v2hs.html http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/9b0ca45cd756/src/share/vm/runtime/synchronizer.cpp http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/9b0ca45cd756/src/share/vm/runtime/objectMonitor.cpp

相關文章
相關標籤/搜索