Java1.5加入的JUC併發包,就像一把好用的瑞士軍刀,極大的豐富了Java處理併發的手段,但JUC並不簡單,有必定的學習成本,我曾經也斷斷續續看過一些JUC的實現源碼,可是既不繫統也不夠深刻,此次決定從新出發,從新拜讀大師Doug Lea的神做,因此本身也是抱着以學代練的心態,記錄本身的學習心得,不免有理解不到位的地方,你們輕噴哈。java
不知道你有沒有這樣的感受,在使用JUC中提供的工具類處理併發時,有一種死記硬背的感受,好比LockSupport應該怎麼用,CountDownLatch能幹嗎,但並不清楚其實現原理,只知道how不知道why,這種狀態有二個比較大的問題。linux
那要深刻,最直接有效的辦法就是閱讀源碼!windows
咱們知道JUC看似有不少類,結構錯綜複雜,可是若是要從中挑出最重要的一個類,那必定是隊列同步器AbstractQueuedSynchronizer, 而AbstractQueuedSynchronizer又是利用LockSupport來控制線程的狀態,從而達到線程在等待喚醒之間切換的目的。而咱們處理併發,重點就是管理線程的狀態,因此理解LockSupport是很重要的一個基礎。bash
先來看一個簡單的例子併發
public static void main(String[] args) {
Thread worker = new Thread(() -> {
LockSupport.park();
System.out.println("start work");
});
worker.start();
System.out.println("main thread sleep");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(worker);
try {
worker.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 最終控制檯輸出結果
main thread sleep
start work
複製代碼
啓動一個worker線程,主線程先sleep 500ms,worker線程由於調用了LockSupport的park,會等待,直到主線程sleep結束,調用unpark喚醒worker線程。那麼在JUC以前,咱們經常使用的讓線程等待的方法以下app
Object monitor = new Object();
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
複製代碼
主要有三點區別工具
前面只是鋪墊,如今來到咱們的主菜,解讀LockSupport的park和unpark方法,固然還有一些其餘相似的重載方法,如parkUntil,parkNanos,它們的大致原理相似,感興趣你們能夠自行查閱源碼。oop
這篇文章以及後續的文章,分析的源碼都基於Open Jdk 8。學習
爲何先講unpark方法,由於unpark代碼量少一些,相對簡單,柿子先撿軟的捏-。-ui
//java.util.concurrent.locks.LockSupport.java
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
//
UNSAFE = sun.misc.Unsafe.getUnsafe();
複製代碼
參數thread是咱們要喚醒的目標線程,先判空,而後調用UNSAFE.unpark,UNSAFE是Unsafe對象,不要被這個名字嚇到,這個類提供了不少有用的方法,以前的文章也有提到過,好比獲取類對象中屬性的內存偏移地址,還有 CAS操做等。可是這個Unsafe對象必須使用反射獲得而後才能正常使用,由於getUnsafe方法有判斷當前類加載器是否是BootStrapClassLoader。咱們繼續查看Unsafe類unpark的實現。
// Unsafe.java
public native void unpark(Object thread);
複製代碼
能夠看到unpark是一個native方法,它的native實現是在 hotspot\src\share\vm\prims\unsafe.cpp 看下代碼實現,
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
UnsafeWrapper("Unsafe_Unpark");
// 聲明一個Parker對象p,它是真正幹活的對象
Parker* p = NULL;
if (jthread != NULL) {
// 根據傳入的jthread對象,來獲取native層的oopDesc*對象,oop是oopDesc* 的宏定義
oop java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
// 獲取java_thread對象中_park_event_offset的值,該值就是Parker對象的地址
jlong lp = java_lang_Thread::park_event(java_thread);
if (lp != 0) {
// 若是地址有效,直接轉爲Parker指針
p = (Parker*)addr_from_java(lp);
} else {
// 若是地址無效
MutexLocker mu(Threads_lock);
java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
// 轉爲native層的JavaThread對象
JavaThread* thr = java_lang_Thread::thread(java_thread);
if (thr != NULL) {
// 將JavaThread的成員變量_parker賦值給p
p = thr->parker();
if (p != NULL) { // Bind to Java thread for next time.
// 將p的地址賦值給_park_event_offset,下次獲取時可用
java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
}
}
}
}
}
}
if (p != NULL) {
// 這個USDT2的宏,暫時我也不清楚是幹啥的,不過不影響咱們的分析,咱們先忽略
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
HOTSPOT_THREAD_UNPARK(
(uintptr_t) p);
#endif /* USDT2 */
// 真正乾貨的方法,調用了Parker的unpark方法
p->unpark();
}
複製代碼
根據上面的代碼,咱們須要知道二個native層的類,JavaThread類和Parker類
class JavaThread: public Thread {
private:
JavaThread* _next; // The next thread in the Threads list
oop _threadObj; // The Java level thread object
// 省略代碼...
private:
Parker* _parker;
public:
Parker* parker() { return _parker; }
// 省略代碼...
複製代碼
JavaThread類很長,這裏只列出幾個成員變量,如今只須要知道它是native層的Thread,成員變量_threadObj是Java層的thread對象,經過它native層能夠調用Java層的代碼。咱們繼續重點看下Parker類的實現。
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
// 省略代碼...
}
複製代碼
咱們重點關注_counter字段,能夠簡單理解爲_counter字段 > 0時,能夠通行,即park方法會直接返回,另外park方法返回後,_counter會被賦值爲0,unpark方法能夠將_counter置爲1,而且喚醒當前等待的線程。
能夠看到Parker的父類是os::PlatformParker,那這個類又是幹嗎的呢?這裏先插個題外話, 咱們都知道,Java是跨平臺的,咱們在應用層定義的Thread確定依賴於具體的平臺,不一樣的平臺有不一樣實現,好比Linux是一套代碼,Windows是另一套,那咱們就能理解了,PlatformParker根據平臺有不一樣的實現。在OpenJdk8的實現中支持5個平臺
咱們知道Linux是如今使用比較普遍的操做系統,好比熟知的Android是基於Linux內核,因此這裏咱們就挑選Linux來分析吧。對應的文件路徑hotspot\src\os\linux\vm\os_linux.cpp
void Parker::unpark() {
int s, status ;
// 先進入_mutex的臨界區,聲明以下
// pthread_mutex_t _mutex [1] ;
// pthread_cond_t _cond [2] ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
// 將_counter置爲1
_counter = 1;
// s記錄的是unpark以前的_counter數,若是s < 1,說明有可能該線程在等待狀態,須要喚醒。
if (s < 1) {
// thread might be parked
// _cur_index表明被使用cond的index
if (_cur_index != -1) {
// thread is definitely parked
// 根據虛擬機參數WorkAroundNPTLTimedWaitHang來作不一樣的處理,默認該參數是1
if (WorkAroundNPTLTimedWaitHang) {
// 先喚醒目標等待的線程
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
// 釋放互斥鎖,先喚醒後釋放鎖,可能會致使線程被喚醒後獲取不到鎖,再次進入等待狀態,個人理解是效率可能會低一丟丟
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
} else {
// 先釋放鎖
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
// 後發信號喚醒線程,喚醒操做在互斥代碼塊外部,感受這裏可能會有風險,暫時還GET不到。。。
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
}
} else {
// 若是線程沒有在等待,直接返回
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
} else {
// 若是線程沒有在等待,直接返回
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}
複製代碼
代碼很少,都加了註釋,整體來講就是根據Park類的成員變量_counter來作加鎖解鎖和喚醒操做,在Linux平臺, 加鎖用的pthread_mutex_lock,解鎖是pthread_mutex_unlock,喚醒是pthread_cond_signal 。接下來解析LockSupport的park方法。
先看下Java層park方法的實現
public static void park() {
UNSAFE.park(false, 0L);
}
複製代碼
Unsafe中的實現
public native void park(boolean var1, long var2);
複製代碼
仍然是一個native方法,咱們繼續跟進去看下
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
// 省略代碼...
thread->parker()->park(isAbsolute != 0, time);
// 省略代碼...
複製代碼
省略了非關鍵代碼,重點是park方法,這個thread咱們前面已經遇到過,就是native層的JavaThread對象,而後調用Parker的park方法,繼續跟進去linux平臺的os_linux.cpp的實現
void Parker::park(bool isAbsolute, jlong time) {
// 先原子的將_counter的值設爲0,並返回_counter的原值,若是原值>0說明有通行證,直接返回
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// 判斷線程是否已經被中斷
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
timespec absTime;
// park方法的傳參是isAbsolute = false, time = 0,因此會繼續往下走
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; } // 這裏time爲0,若是調用的是parkNanos或者parkUtil,這裏time就會>0, if (time > 0) { // 若是time > 0,unpackTime計算absTime的時間 unpackTime(&absTime, isAbsolute, time); } ThreadBlockInVM tbivm(jt); // 再次判斷線程是否被中斷,若是沒有被中斷,嘗試得到互斥鎖,若是獲取失敗,直接返回 if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; } int status ; // 若是_counter > 0, 不須要等待,這裏再次檢查_counter的值 if (_counter > 0) { // no wait needed _counter = 0; status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; // 插入一個寫內存屏障,保證可見性,具體實現見下方 OrderAccess::fence(); return; } // 省略assert代碼 OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); // 設置JavaThread的_suspend_equivalent爲true,表示線程被暫停 jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() assert(_cur_index == -1, "invariant"); if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed // 讓線程等待_cond[_cur_index]信號,到這裏線程進入等待狀態 status = pthread_cond_wait (&_cond[_cur_index], _mutex) ; } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; // 線程進入有超時時間的等待,內部實現調用了pthread_cond_timedwait系統調用 status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } _cur_index = -1; // 省略assert代碼 // _counter從新設置爲0 _counter = 0 ; // 釋放互斥鎖 status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // 插入寫屏障 OrderAccess::fence(); // 省略額外檢查代碼 } // OrderAccess::fence 在linux平臺的實現 inline void OrderAccess::fence() { // 若是是多核cpu if (os::is_MP()) { // always use locked addl since mfence is sometimes expensive // 判斷是否AMD64 CPU,彙編代碼實現寫屏障 #ifdef AMD64 __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory"); #else __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory"); #endif } } 複製代碼
上面分析了park的實現原理,有了前面unpark方法分析的知識鋪墊,park方法應該很容易看懂。
經過LockSupport的源碼閱讀,能夠總結出一下幾點
最後,仍是想提一下Java層關於線程狀態的小知識,可能有些同窗會不是特別清楚,因此仍是作個總結。 Java線程狀態有如下6種。
關於併發,咱們軟件工程師要作的,就是控制線程在這幾個狀態間正確轉換,所謂「工欲善其事,必先利其器」,JDK提供的各類併發工具類,咱們只有深刻了解它們,才能靈活高效的運用,這也是我記錄"啃透Java併發"系列文章的初心,與君共勉!