public class Test {
public static void main(String[] args){ synchronized(Test.class){ System.out.println("hello"); } } } 複製代碼
截取部分字節碼,以下html
4: monitorenter
5: getstatic #9 // Field java/lang/System.out:Ljava/io/PrintStream; 8: ldc #15 // String hello 10: invokevirtual #17 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 13: aload_1 14: monitorexit 複製代碼
字節碼出現了4: monitorenter和14: monitorexit兩個指令;字面理解就是監視進入,監視退出。能夠理解爲代碼塊執行前的加鎖,和退出同步時的解鎖java
objectMonitor.cpp
ObjectMonitor() { _header = NULL; _count = 0; \\用來記錄該線程獲取鎖的次數 _waiters = 0, _recursions = 0; \\鎖的重入次數 _object = NULL; _owner = NULL; \\當前持有ObjectMonitor的線程 _WaitSet = NULL; \\wait()方法調用後的線程等待隊列 _WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ; \\阻塞等待隊列 FreeNext = NULL ; _EntryList = NULL ; \\synchronized 進來線程的排隊隊列 _SpinFreq = 0 ; _SpinClock = 0 ; \\自旋計算 OwnerIsThread = 0 ; } 複製代碼
void ATTR ObjectMonitor::enter(TRAPS) {
... //獲取鎖:cmpxchg_ptr原子操做,嘗試將_owner替換爲本身,並返回舊值 cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ; ... // 重複獲取鎖,次數加1,返回 if (cur == Self) { _recursions ++ ; return ; } //首次獲取鎖狀況處理 if (Self->is_lock_owned ((address)cur)) { assert (_recursions == 0, "internal state error"); _recursions = 1 ; _owner = Self ; OwnerIsThread = 1 ; return ; } ... //嘗試自旋獲取鎖 if (Knob_SpinEarly && TrySpin (Self) > 0) { ... 複製代碼
void ATTR ObjectMonitor::exit(TRAPS)...
代碼太長,就不貼了。主要是recursions減一、count減小1或者若是線程再也不持有owner(非重入加鎖)則設置owner爲null,退鎖的持有狀態,並喚醒Cxq隊列的線程
總結node
public synchronized void lock(){
System.out.println("world"); } .... public synchronized void lock(); descriptor: ()V flags: (0x0029) ACC_PUBLIC, ACC_SYNCHRONIZED Code: stack=2, locals=0, args_size=0 0: getstatic #20 // Field java/lang/System.out:Ljava/io/PrintStream; 3: ldc #26 // String world 5: invokevirtual #28 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 複製代碼
class ObjectWaiter : public StackObj { public: enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ; enum Sorted { PREPEND, APPEND, SORTED } ; ObjectWaiter * volatile _next; ObjectWaiter * volatile _prev; Thread* _thread; ParkEvent * _event; volatile int _notified ; volatile TStates TState ; Sorted _Sorted ; // List placement disposition bool _active ; // Contention monitoring is enabled public: ObjectWaiter(Thread* thread); void wait_reenter_begin(ObjectMonitor *mon); void wait_reenter_end(ObjectMonitor *mon); }; 複製代碼
調用對象鎖的wait()方法時,線程會被封裝成ObjectWaiter,最後使用park方法掛起git
//objectMonitor.cpp
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS){ ... //線程封裝成 ObjectWaiter對象 ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT ; ... //一系列判斷操做,當線程確實加入WaitSet時,則使用park方法掛起 if (node._notified == 0) { if (millis <= 0) { Self->_ParkEvent->park () ; } else { ret = Self->_ParkEvent->park (millis) ; } } 複製代碼
而當對象鎖使用notify()時github
Atomic::cmpxchg_ptr
指令自旋操做加入
cxq隊列或者直接unpark喚醒
void ObjectMonitor::notify(TRAPS){
CHECK_OWNER(); //waitSet爲空,則直接返回 if (_WaitSet == NULL) { TEVENT (Empty-Notify) ; return ; } ... //經過DequeueWaiter獲取_WaitSet列表中的第一個ObjectWaiter Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ; ObjectWaiter * iterator = DequeueWaiter() ; if (iterator != NULL) { .... if (Policy == 2) { // prepend to cxq // prepend to cxq if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } } 複製代碼
voidObjectMonitor::notifyAll(TRAPS)
,流程和notify相似。不過會經過for循環取出WaitSet的ObjectWaiter節點,再依次喚醒全部線程
先介紹下32位JVM下JAVA對象頭的結構 web
偏向鎖緩存
輕量級鎖安全
重量級鎖多線程
自旋鎖併發
鎖粗化
Test.class
//編譯器會考慮將兩次加鎖合併 public void test(){ synchronized(this){ System.out.println("hello"); } synchronized(this){ System.out.println("world"); } } 複製代碼
-server -XX:+DoEscapeAnalysis -XX:+EliminateLocks
//StringBuffer的append操做會加上synchronized,
//可是變量buf不加鎖也安全的,編譯器會把鎖消除 public void test() { StringBuffer buf = new StringBuffer(); buf.append("hello").append("world"); } 複製代碼
volatile int i = 0; i++
中,volatile類型的讀寫是原子同步的,可是i++卻不能保證同步性,咱們該怎麼呢?
int expectedValue = 1;
public boolean compareAndSet(int newValue) { if(expectedValue == 1){ expectedValue = newValue; return ture; } return false; } 複製代碼
在jdk是有提供同步版的CAS解決方案,其中使用了UnSafe.java的底層方法
//UnSafe.java @HotSpotIntrinsicCandidate public final native boolean compareAndSetInt(Object o, long offset, int expected, int x) .. @HotSpotIntrinsicCandidate public final native int compareAndExchangeInt(Object o, long offset, int expected, int x)... 複製代碼
咱們再來看看本地方法,Unsafe.cpp中的compareAndSwapInt
//unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper("Unsafe_CompareAndSwapInt"); oop p = JNIHandles::resolve(obj); jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); return (jint)(Atomic::cmpxchg(x, addr, e)) == e; UNSAFE_END 複製代碼
在Linux的x86,Atomic::cmpxchg方法的實現以下
/**
1 __asm__表示彙編的開始; 2 volatile表示禁止編譯器優化;//禁止指令重排 3 LOCK_IF_MP是個內聯函數, 根據當前系統是否爲多核處理器, 決定是否爲cmpxchg指令添加lock前綴 //內存屏障 */ inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { int mp = os::is_MP(); __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc", "memory"); return exchange_value; } 複製代碼
到這一步,能夠總結到:jdk提供的CAS機制,在彙編層級,會禁止變量兩側的指令優化,而後使用cmpxchg指令比較並更新變量值(原子性),若是是多核則使用lock鎖定(緩存鎖、MESI)
//僞代碼
volatile state = 0 ; // 0-無鎖 1-加鎖;volatile禁止指令重排,加入內存屏障 ... if(cas(state, 0 , 1)){ // 1 加鎖成功,只有一個線程能成功加鎖 ... // 2 同步代碼塊 cas(state, 1, 0); // 3 解鎖時2的操做具備可見性 } 複製代碼
JavaThread* thread=JavaThread::thread_from_jni_environment(env); ... thread->parker()->park(isAbsolute != 0, time); 複製代碼
class PlatformParker : public CHeapObj { protected: //互斥變量類型 pthread_mutex_t _mutex [1] ; //條件變量類型 pthread_cond_t _cond [1] ; ... } class Parker : public os::PlatformParker { private: volatile int _counter ; ... public: void park(bool isAbsolute, jlong time); void unpark(); ... } 複製代碼
//AbstractQueuedSynchronizer.java
public class AbstractQueuedSynchronizer{ //線程節點 static final class Node { ... volatile Node prev; volatile Node next; volatile Thread thread; ... } .... //head 等待隊列頭尾節點 private transient volatile Node head; private transient volatile Node tail; // The synchronization state. 同步狀態 private volatile int state; ... //提供CAS操做,狀態具體的修改由子類實現 protected final boolean compareAndSetState(int expect, int update) { return STATE.compareAndSet(this, expect, update); } } 複製代碼
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 複製代碼
在AQS還存一個ConditionObject的內部類,它的使用機制和Object.wait、notify相似
//AbstractQueuedSynchronizer.java public class ConditionObject implements Condition, java.io.Serializable { //條件隊列;Node 複用了AQS中定義的Node private transient Node firstWaiter; private transient Node lastWaiter; ... 複製代碼
//相似Object.wait
public final void await() throws InterruptedException{ ... Node node = addConditionWaiter(); //構造Node,加入條件隊列 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //掛起線程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //notify喚醒線程後,加入同步隊列繼續競爭鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; 複製代碼
//相似Object.notify
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } 複製代碼
protected boolean tryAcquire(int arg);//嘗試獨佔性加鎖 protected boolean tryRelease(int arg);//對應tryAcquire釋放鎖 protected int tryAcquireShared(int arg);//嘗試共享性加鎖 protected boolean tryReleaseShared(int arg);//對應tryAcquireShared釋放鎖 protected boolean isHeldExclusively();//該線程是否正在獨佔資源,只有用到condition才須要取實現它 複製代碼
abstract static class Sync extends AbstractQueuedSynchronizer{
.... final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //直接CAS狀態加鎖,非公平操做 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } ... //重寫了tryRelease protected final boolean tryRelease(int releases) { c = state - releases; //改變同步狀態 ... //修改volatile 修飾的狀態變量 setState(c); return free; } } 複製代碼
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } .... static final class FairSync extends Sync { protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } .... 複製代碼
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2); @Override public void lockInterruptibly() throws InterruptedException { throw new RuntimeException(""); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {throw new RuntimeException("");} @Override public Condition newCondition() { return sync.newCondition(); } @Override public void lock() { sync.acquireShared(1); } @Override public void unlock() { sync.releaseShared(1); } } @Override public boolean tryLock() { return sync.tryAcquireShared(1) > -1; } } 複製代碼
再來看看Sync的代碼
class Sync extends AbstractQueuedSynchronizer {
Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero"); } setState(count); } @Override public int tryAcquireShared(int reduceCount) { for (; ; ) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } @Override public boolean tryReleaseShared(int returnCount) { for (; ; ) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } } public Condition newCondition() { return new AbstractQueuedSynchronizer.ConditionObject(); } } 複製代碼