LockSupport是用來建立locks的基本線程阻塞基元,好比AQS中實現線程掛起的方法,就是park,對應喚醒就是unpark。JDK中有使用的以下html
LockSupport提供的是一個許可,若是存在許可,線程在調用park
的時候,會立馬返回,此時許可也會被消費掉,若是沒有許可,則會阻塞。調用unpark的時候,若是許可自己不可用,則會使得許可可用java
許可只有一個,不可累加
park的聲明形式有一下兩大塊linux
一部分多了一個Object參數,做爲blocker,另外的則沒有。blocker的好處在於,在診斷問題的時候可以知道park的緣由app
推薦使用帶有Object的park操做
park用於掛起當前線程,若是許可可用,會立馬返回,並消費掉許可。函數
以park的源碼爲例oop
public static void park(Object blocker) { //獲取當前線程 Thread t = Thread.currentThread(); //記錄當前線程阻塞的緣由,底層就是unsafe.putObject,就是把對象存儲起來 setBlocker(t, blocker); //執行park unsafe.park(false, 0L); //線程恢復後,去掉阻塞緣由 setBlocker(t, null); }
從源碼能夠看到真實的實現均在 unsafe性能
核心實現以下ui
JavaThread* thread=JavaThread::thread_from_jni_environment(env); ... thread->parker()->park(isAbsolute != 0, time);
就是獲取java線程的parker對象,而後執行它的park方法。Parker的定義以下this
class Parker : public os::PlatformParker { private: //表示許可 volatile int _counter ; Parker * FreeNext ; JavaThread * AssociatedWith ; // Current association public: Parker() : PlatformParker() { //初始化_counter _counter = 0 ; FreeNext = NULL ; AssociatedWith = NULL ; } protected: ~Parker() { ShouldNotReachHere(); } public: void park(bool isAbsolute, jlong time); void unpark(); // Lifecycle operators static Parker * Allocate (JavaThread * t) ; static void Release (Parker * e) ; private: static Parker * volatile FreeList ; static volatile int ListLock ; };
它繼承了os::PlatformParker,內置了一個volatitle的 _counter。PlatformParker則是在不一樣的操做系統中有不一樣的實現,以linux爲例spa
class PlatformParker : public CHeapObj { protected: //互斥變量類型 pthread_mutex_t _mutex [1] ; //條件變量類型 pthread_cond_t _cond [1] ; public: ~PlatformParker() { guarantee (0, "invariant") ; } public: PlatformParker() { int status; //初始化條件變量,使用 pthread_cond_t以前必須先執行初始化 status = pthread_cond_init (_cond, NULL); assert_status(status == 0, status, "cond_init」); // 初始化互斥變量,使用 pthread_mutex_t以前必須先執行初始化 status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); } }
上述代碼均爲POSIX線程接口使用,因此pthread指的也就是posixThread
parker實現以下
void Parker::park(bool isAbsolute, jlong time) { if (_counter > 0) { //已經有許可了,用掉當前許可 _counter = 0 ; //使用內存屏障,確保 _counter賦值爲0(寫入操做)可以被內存屏障以後的讀操做獲取內存屏障事前的結果,也就是可以正確的讀到0 OrderAccess::fence(); //當即返回 return ; } Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; if (Thread::is_interrupted(thread, false)) { // 線程執行了中斷,返回 return; } if (time < 0 || (isAbsolute && time == 0) ) { //時間到了,或者是表明絕對時間,同時絕對時間是0(此時也是時間到了),直接返回,java中的parkUtil傳的就是絕對時間,其它都不是 return; } if (time > 0) { //傳入了時間參數,將其存入absTime,並解析成absTime->tv_sec(秒)和absTime->tv_nsec(納秒)存儲起來,存的是絕對時間 unpackTime(&absTime, isAbsolute, time); } //進入safepoint region,更改線程爲阻塞狀態 ThreadBlockInVM tbivm(jt); if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { //若是線程被中斷,或者是在嘗試給互斥變量加鎖的過程當中,加鎖失敗,好比被其它線程鎖住了,直接返回 return; } //這裏表示線程互斥變量鎖成功了 int status ; if (_counter > 0) { // 有許可了,返回 _counter = 0; //對互斥變量解鎖 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; OrderAccess::fence(); return; } #ifdef ASSERT // Don't catch signals while blocked; let the running threads have the signals. // (This allows a debugger to break into the running thread.) //debug用 sigset_t oldsigs; sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals(); pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs); #endif //將java線程所擁有的操做系統線程設置成 CONDVAR_WAIT狀態 ,表示在等待某個條件的發生 OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); //將java的_suspend_equivalent參數設置爲true jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() if (time == 0) { //把調用線程放到等待條件的線程列表上,而後對互斥變量解鎖,(這兩是原子操做),這個時候線程進入等待,當它返回時,互斥變量再次被鎖住。 //成功返回0,不然返回錯誤編號 status = pthread_cond_wait (_cond, _mutex) ; } else { //同pthread_cond_wait,只是多了一個超時,若是超時尚未條件出現,那麼從新獲取胡吃兩而後返回錯誤碼 ETIMEDOUT status = os::Linux::safe_cond_timedwait (_cond, _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { //WorkAroundNPTLTimedWaitHang 是JVM的運行參數,默認爲1 //去除初始化 pthread_cond_destroy (_cond) ; //從新初始化 pthread_cond_init (_cond, NULL); } } assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait"); #ifdef ASSERT pthread_sigmask(SIG_SETMASK, &oldsigs, NULL); #endif //等待結束後,許可被消耗,改成0 _counter = 0 ; //釋放互斥量的鎖 status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } //加入內存屏障指令 OrderAccess::fence(); }
從park的實現能夠看到
在linux中實現原理以下
inline void OrderAccess::fence() { if (os::is_MP()) { #ifdef AMD64 // 沒有使用mfence,由於mfence有時候性能差於使用 locked addl __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory"); #else __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory"); #endif } }
內存重排序網上的驗證
這屬於C++新建變量的語法,也就是調用構造函數新建了一個變量,變量名爲tbivm,參數爲jt。類的實現爲
class ThreadBlockInVM : public ThreadStateTransition { public: ThreadBlockInVM(JavaThread *thread) : ThreadStateTransition(thread) { // Once we are blocked vm expects stack to be walkable thread->frame_anchor()->make_walkable(thread); //把線程由運行狀態轉成阻塞狀態 trans_and_fence(_thread_in_vm, _thread_blocked); } ... };
_thread_in_vm 表示線程當前在VM中執行,_thread_blocked表示線程當前阻塞了,他們是globalDefinitions.hpp
中定義的枚舉
//這個枚舉是用來追蹤線程在代碼的那一塊執行,用來給 safepoint code使用,有4種重要的類型,_thread_new/_thread_in_native/_thread_in_vm/_thread_in_Java。形如xxx_trans的狀態都是中間狀態,表示線程正在由一種狀態變成另外一種狀態,這種方式使得 safepoint code在處理線程狀態時,不須要對線程進行掛起,使得safe point code運行更快,而給定一個狀態,經過+1就能夠獲得他的轉換狀態 enum JavaThreadState { _thread_uninitialized = 0, // should never happen (missing initialization) _thread_new = 2, // just starting up, i.e., in process of being initialized _thread_new_trans = 3, // corresponding transition state (not used, included for completeness) _thread_in_native = 4, // running in native code . This is a safepoint region, since all oops will be in jobject handles _thread_in_native_trans = 5, // corresponding transition state _thread_in_vm = 6, // running in VM _thread_in_vm_trans = 7, // corresponding transition state _thread_in_Java = 8, // Executing either interpreted or compiled Java code running in Java or in stub code _thread_in_Java_trans = 9, // corresponding transition state (not used, included for completeness) _thread_blocked = 10, // blocked in vm _thread_blocked_trans = 11, // corresponding transition state _thread_max_state = 12 // maximum thread state+1 - used for statistics allocation };
父類ThreadStateTransition中定義trans_and_fence以下
void trans_and_fence(JavaThreadState from, JavaThreadState to) { transition_and_fence(_thread, from, to);} //_thread即構造函數傳進來de thread // transition_and_fence must be used on any thread state transition // where there might not be a Java call stub on the stack, in // particular on Windows where the Structured Exception Handler is // set up in the call stub. os::write_memory_serialize_page() can // fault and we can't recover from it on Windows without a SEH in // place. //transition_and_fence方法必須在任何線程狀態轉換的時候使用 static inline void transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) { assert(thread->thread_state() == from, "coming from wrong thread state"); assert((from & 1) == 0 && (to & 1) == 0, "odd numbers are transitions states"); //標識線程轉換中 thread->set_thread_state((JavaThreadState)(from + 1)); // 設置內存屏障,確保新的狀態可以被VM 線程看到 if (os::is_MP()) { if (UseMembar) { // Force a fence between the write above and read below OrderAccess::fence(); } else { // Must use this rather than serialization page in particular on Windows InterfaceSupport::serialize_memory(thread); } } if (SafepointSynchronize::do_call_back()) { SafepointSynchronize::block(thread); } //線程狀態轉換成最終的狀態,對待這裏的場景就是阻塞 thread->set_thread_state(to); CHECK_UNHANDLED_OOPS_ONLY(thread->clear_unhandled_oops();) }
在osThread中給定了操做系統線程狀態的大體取值,它自己是依據平臺而定
enum ThreadState { ALLOCATED, // Memory has been allocated but not initialized INITIALIZED, // The thread has been initialized but yet started RUNNABLE, // Has been started and is runnable, but not necessarily running MONITOR_WAIT, // Waiting on a contended monitor lock CONDVAR_WAIT, // Waiting on a condition variable OBJECT_WAIT, // Waiting on an Object.wait() call BREAKPOINTED, // Suspended at breakpoint SLEEPING, // Thread.sleep() ZOMBIE // All done, but not reclaimed yet };
實現以下
void Parker::unpark() { int s, status ; //給互斥量加鎖,若是互斥量已經上鎖,則阻塞到互斥量被解鎖 //park進入wait時,_mutex會被釋放 status = pthread_mutex_lock(_mutex); assert (status == 0, "invariant") ; //存儲舊的_counter s = _counter; //許可改成1,每次調用都設置成發放許可 _counter = 1; if (s < 1) { //以前沒有許可 if (WorkAroundNPTLTimedWaitHang) { //默認執行 ,釋放信號,代表條件已經知足,將喚醒等待的線程 status = pthread_cond_signal (_cond) ; assert (status == 0, "invariant") ; //釋放鎖 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } else { status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; status = pthread_cond_signal (_cond) ; assert (status == 0, "invariant") ; } } else { //一直有許可,釋放掉本身加的鎖,有許可park自己就返回了 pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } }
從源碼可知unpark自己就是發放許可,並通知等待的線程,已經能夠結束等待了