悲觀鎖html
樂觀鎖java
自旋鎖node
public class Test { public static void main(String[] args){ synchronized(Test.class){ System.out.println("hello"); } } }
截取部分字節碼,以下git
4: monitorenter 5: getstatic #9 // Field java/lang/System.out:Ljava/io/PrintStream; 8: ldc #15 // String hello 10: invokevirtual #17 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 13: aload_1 14: monitorexit
字節碼出現了4: monitorenter和14: monitorexit兩個指令;字面理解就是監視進入,監視退出。能夠理解爲代碼塊執行前的加鎖,和退出同步時的解鎖github
objectMonitor.cpp ObjectMonitor() { _header = NULL; _count = 0; \\用來記錄該線程獲取鎖的次數 _waiters = 0, _recursions = 0; \\鎖的重入次數 _object = NULL; _owner = NULL; \\當前持有ObjectMonitor的線程 _WaitSet = NULL; \\wait()方法調用後的線程等待隊列 _WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ; \\阻塞等待隊列 FreeNext = NULL ; _EntryList = NULL ; \\synchronized 進來線程的排隊隊列 _SpinFreq = 0 ; _SpinClock = 0 ; \\自旋計算 OwnerIsThread = 0 ; }
void ATTR ObjectMonitor::enter(TRAPS) { ... //獲取鎖:cmpxchg_ptr原子操做,嘗試將_owner替換爲本身,並返回舊值 cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ; ... // 重複獲取鎖,次數加1,返回 if (cur == Self) { _recursions ++ ; return ; } //首次獲取鎖狀況處理 if (Self->is_lock_owned ((address)cur)) { assert (_recursions == 0, "internal state error"); _recursions = 1 ; _owner = Self ; OwnerIsThread = 1 ; return ; } ... //嘗試自旋獲取鎖 if (Knob_SpinEarly && TrySpin (Self) > 0) { ...
void ATTR ObjectMonitor::exit(TRAPS)...
代碼太長,就不貼了。主要是recursions減一、count減小1或者若是線程再也不持有owner(非重入加鎖)則設置owner爲null,退鎖的持有狀態,並喚醒Cxq隊列的線程總結緩存
public synchronized void lock(){ System.out.println("world"); } .... public synchronized void lock(); descriptor: ()V flags: (0x0029) ACC_PUBLIC, ACC_SYNCHRONIZED Code: stack=2, locals=0, args_size=0 0: getstatic #20 // Field java/lang/System.out:Ljava/io/PrintStream; 3: ldc #26 // String world 5: invokevirtual #28 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
class ObjectWaiter : public StackObj { public: enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ; enum Sorted { PREPEND, APPEND, SORTED } ; ObjectWaiter * volatile _next; ObjectWaiter * volatile _prev; Thread* _thread; ParkEvent * _event; volatile int _notified ; volatile TStates TState ; Sorted _Sorted ; // List placement disposition bool _active ; // Contention monitoring is enabled public: ObjectWaiter(Thread* thread); void wait_reenter_begin(ObjectMonitor *mon); void wait_reenter_end(ObjectMonitor *mon); };
調用對象鎖的wait()方法時,線程會被封裝成ObjectWaiter,最後使用park方法掛起安全
//objectMonitor.cpp void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS){ ... //線程封裝成 ObjectWaiter對象 ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT ; ... //一系列判斷操做,當線程確實加入WaitSet時,則使用park方法掛起 if (node._notified == 0) { if (millis <= 0) { Self->_ParkEvent->park () ; } else { ret = Self->_ParkEvent->park (millis) ; } }
而當對象鎖使用notify()時多線程
Atomic::cmpxchg_ptr
指令自旋操做加入cxq隊列或者直接unpark喚醒void ObjectMonitor::notify(TRAPS){ CHECK_OWNER(); //waitSet爲空,則直接返回 if (_WaitSet == NULL) { TEVENT (Empty-Notify) ; return ; } ... //經過DequeueWaiter獲取_WaitSet列表中的第一個ObjectWaiter Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ; ObjectWaiter * iterator = DequeueWaiter() ; if (iterator != NULL) { .... if (Policy == 2) { // prepend to cxq // prepend to cxq if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } }
voidObjectMonitor::notifyAll(TRAPS)
,流程和notify相似。不過會經過for循環取出WaitSet的ObjectWaiter節點,再依次喚醒全部線程偏向鎖併發
輕量級鎖app
重量級鎖
自旋鎖
鎖粗化
Test.class //編譯器會考慮將兩次加鎖合併 public void test(){ synchronized(this){ System.out.println("hello"); } synchronized(this){ System.out.println("world"); } }
鎖消除
-server -XX:+DoEscapeAnalysis -XX:+EliminateLocks
//StringBuffer的append操做會加上synchronized, //可是變量buf不加鎖也安全的,編譯器會把鎖消除 public void test() { StringBuffer buf = new StringBuffer(); buf.append("hello").append("world"); }
其餘鎖優化方法
volatile int i = 0; i++
中,volatile類型的讀寫是原子同步的,可是i++卻不能保證同步性,咱們該怎麼呢?int expectedValue = 1; public boolean compareAndSet(int newValue) { if(expectedValue == 1){ expectedValue = newValue; return ture; } return false; }
在jdk是有提供同步版的CAS解決方案,其中使用了UnSafe.java的底層方法
//UnSafe.java @HotSpotIntrinsicCandidate public final native boolean compareAndSetInt(Object o, long offset, int expected, int x) .. @HotSpotIntrinsicCandidate public final native int compareAndExchangeInt(Object o, long offset, int expected, int x)...
咱們再來看看本地方法,Unsafe.cpp中的compareAndSwapInt
//unsafe.cpp UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper("Unsafe_CompareAndSwapInt"); oop p = JNIHandles::resolve(obj); jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); return (jint)(Atomic::cmpxchg(x, addr, e)) == e; UNSAFE_END
在Linux的x86,Atomic::cmpxchg方法的實現以下
/** 1 __asm__表示彙編的開始; 2 volatile表示禁止編譯器優化;//禁止指令重排 3 LOCK_IF_MP是個內聯函數, 根據當前系統是否爲多核處理器, 決定是否爲cmpxchg指令添加lock前綴 //內存屏障 */ inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { int mp = os::is_MP(); __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc", "memory"); return exchange_value; }
到這一步,能夠總結到:jdk提供的CAS機制,在彙編層級,會禁止變量兩側的指令優化,而後使用cmpxchg指令比較並更新變量值(原子性),若是是多核則使用lock鎖定(緩存鎖、MESI)
ABA問題
只能保證一個共享變量的原子操做
先說說實現鎖的要素
要素1:能夠利用CAS的原子性來實現,任意時刻只有一個線程能成功操做變量
要素2:使用volatile修飾狀態變量,禁止指令重排
要素3:仍是用volatile,volatile變量寫指令先後會插入內存屏障
//僞代碼 volatile state = 0 ; // 0-無鎖 1-加鎖;volatile禁止指令重排,加入內存屏障 ... if(cas(state, 0 , 1)){ // 1 加鎖成功,只有一個線程能成功加鎖 ... // 2 同步代碼塊 cas(state, 1, 0); // 3 解鎖時2的操做具備可見性 }
JavaThread* thread=JavaThread::thread_from_jni_environment(env); ... thread->parker()->park(isAbsolute != 0, time);
class PlatformParker : public CHeapObj { protected: //互斥變量類型 pthread_mutex_t _mutex [1] ; //條件變量類型 pthread_cond_t _cond [1] ; ... } class Parker : public os::PlatformParker { private: volatile int _counter ; ... public: void park(bool isAbsolute, jlong time); void unpark(); ... }
unpark和park執行順序不一樣時,counter和cond的狀態變化以下
//AbstractQueuedSynchronizer.java public class AbstractQueuedSynchronizer{ //線程節點 static final class Node { ... volatile Node prev; volatile Node next; volatile Thread thread; ... } .... //head 等待隊列頭尾節點 private transient volatile Node head; private transient volatile Node tail; // The synchronization state. 同步狀態 private volatile int state; ... //提供CAS操做,狀態具體的修改由子類實現 protected final boolean compareAndSetState(int expect, int update) { return STATE.compareAndSet(this, expect, update); } }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
在AQS還存一個ConditionObject的內部類,它的使用機制和Object.wait、notify相似
//AbstractQueuedSynchronizer.java public class ConditionObject implements Condition, java.io.Serializable { //條件隊列;Node 複用了AQS中定義的Node private transient Node firstWaiter; private transient Node lastWaiter; ...
//相似Object.wait public final void await() throws InterruptedException{ ... Node node = addConditionWaiter(); //構造Node,加入條件隊列 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //掛起線程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //notify喚醒線程後,加入同步隊列繼續競爭鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
//相似Object.notify private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
protected boolean tryAcquire(int arg);//嘗試獨佔性加鎖 protected boolean tryRelease(int arg);//對應tryAcquire釋放鎖 protected int tryAcquireShared(int arg);//嘗試共享性加鎖 protected boolean tryReleaseShared(int arg);//對應tryAcquireShared釋放鎖 protected boolean isHeldExclusively();//該線程是否正在獨佔資源,只有用到condition才須要取實現它
abstract static class Sync extends AbstractQueuedSynchronizer{ .... final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //直接CAS狀態加鎖,非公平操做 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } ... //重寫了tryRelease protected final boolean tryRelease(int releases) { c = state - releases; //改變同步狀態 ... //修改volatile 修飾的狀態變量 setState(c); return free; } }
static final class NonfairSync extends Sync { protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } .... static final class FairSync extends Sync { protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } ....
public class TwinsLock implements Lock { private final Sync sync = new Sync(2); @Override public void lockInterruptibly() throws InterruptedException { throw new RuntimeException(""); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {throw new RuntimeException("");} @Override public Condition newCondition() { return sync.newCondition(); } @Override public void lock() { sync.acquireShared(1); } @Override public void unlock() { sync.releaseShared(1); } } @Override public boolean tryLock() { return sync.tryAcquireShared(1) > -1; } }
再來看看Sync的代碼
class Sync extends AbstractQueuedSynchronizer { Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero"); } setState(count); } @Override public int tryAcquireShared(int reduceCount) { for (; ; ) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } @Override public boolean tryReleaseShared(int returnCount) { for (; ; ) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } } public Condition newCondition() { return new AbstractQueuedSynchronizer.ConditionObject(); } }