(1)LockSupport比Object的wait/notify有兩大優點,分別是什麼?html
(2)LockSupport源碼是如何實現的,具體說說你的見解?java
LockSupport是Java6引入的一個工具類,它簡單靈活,應用普遍。node
1.簡單面試
俗話說,沒有比較就沒有傷害。這裏我們仍是經過對比來介紹LockSupport的簡單。算法
在沒有LockSupport以前,線程的掛起和喚醒我們都是經過Object的wait和notify/notifyAll方法實現。編程
寫一段例子代碼,線程A執行一段業務邏輯後調用wait阻塞住本身。主線程調用notify方法喚醒線程A,線程A而後打印本身執行的結果。併發
public class TestObjWait {ide
public static void main(String[] args)throws Exception {
final Object obj = new Object();
Thread A = new Thread(new Runnable() {
@Override
public void run() {
int sum = 0;
for(int i=0;i<10;i++){
sum+=i;
}
try {
obj.wait();
}catch (Exception e){
e.printStackTrace();
}
System.out.println(sum);
}
});
A.start();
//睡眠一秒鐘,保證線程A已經計算完成,阻塞在wait方法
Thread.sleep(1000);
obj.notify();
}
}函數
執行這段代碼,不難發現這個錯誤工具
java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at com.aaa.TestObjWait$1.run(TestObjWait.java:15)
at java.lang.Thread.run(Thread.java:748)
45
Exception in thread "main" java.lang.IllegalMonitorStateException
at java.lang.Object.notify(Native Method)
at com.aaa.TestObjWait.main(TestObjWait.java:25)
緣由很簡單,wait和notify/notifyAll方法只能在同步代碼塊裏用。因此將代碼修改以下就能夠正常運行
public class TestObjWait {
public static void main(String[] args)throws Exception {
final Object obj = new Object();
Thread A = new Thread(new Runnable() {
@Override
public void run() {
int sum = 0;
for(int i=0;i<10;i++){
sum+=i;
}
try {
synchronized (obj){
obj.wait();
}
}catch (Exception e){
e.printStackTrace();
}
System.out.println(sum);
}
});
A.start();
//睡眠一秒鐘,保證線程A已經計算完成,阻塞在wait方法
Thread.sleep(1000);
synchronized (obj){
obj.notify();
}
}
}
那若是我們換成LockSupport呢?簡單得很,看代碼:
public class TestLo { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(() -> { int sum = 0; for (int i = 0; i < 10; i++) { //——步驟二 sum += i; } //——這裏會阻塞 LockSupport.park(); //——步驟四 System.out.println(sum); //——步驟五 }); thread.start(); //——步驟一 Thread.sleep(1000); LockSupport.unpark(thread); //——步驟三 } }
直接調用便可,沒有說非得在同步代碼塊裏才能用,很是easy
2.靈活性
若是隻是LockSupport在使用起來比Object的wait/notify簡單,那還真不必專門講解下LockSupport。最主要的是靈活性。
上邊的例子代碼中,主線程調用了Thread.sleep(1000)方法來等待線程A計算完成進入wait狀態。若是去掉Thread.sleep()調用,代碼以下:
public class TestObjWait {
public static void main(String[] args)throws Exception {
final Object obj = new Object();
Thread A = new Thread(new Runnable() {
@Override
public void run() {
int sum = 0;
for(int i=0;i<10;i++){
sum+=i;
}
try {
synchronized (obj){
obj.wait();
}
}catch (Exception e){
e.printStackTrace();
}
System.out.println(sum);
}
});
A.start();
//睡眠一秒鐘,保證線程A已經計算完成,阻塞在wait方法
//Thread.sleep(1000);
synchronized (obj){
obj.notify();
}
}
}
多運行幾回上面的代碼,有時候可以正常打印結果並退出程序,但有時候線程沒法打印結果阻塞了。
緣由在於:主線程調用完notify後,線程A才進入wait方法,致使線程A一直阻塞了。因爲線程A不是後臺線程,因此整個程序沒法退出。
那若是換成LockSupport呢?
LockSupport就支持主線程先調用unpark後,線程A再調用parl而不被阻塞嗎?是的,沒錯,代碼以下
public class TestObjWait {
public static void main(String[] args)throws Exception {
final Object obj = new Object();
Thread A = new Thread(new Runnable() {
@Override
public void run() {
int sum = 0;
for(int i=0;i<10;i++){
sum+=i;
}
LockSupport.park();
System.out.println(sum);
}
});
A.start();
//睡眠一秒鐘,保證線程A已經計算完成,阻塞在wait方法
//Thread.sleep(1000);
LockSupport.unpark(A);
}
}
無論你執行多少次,這段代碼都能正常打印結果並退出。這就是LockSupport最大的靈活所在。
總結一下,LockSupport比Object的wait/notify有兩大優點:
①LockSupport不須要在同步代碼塊裏。因此線程間也不須要維護一個共享的同步對象了,實現了線程間的解耦。
②unpark函數能夠優先於park調用,因此不須要擔憂線程間的執行前後順序。
應用普遍
LockSupport在Java的工具類用應用很普遍,我們這裏找幾個例子感覺感覺。以Java裏最經常使用的類ThreadPoolExecutor爲例。先看以下代碼:
public class Test001 { public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor (5, 5, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000)); Future<String> future = poolExecutor.submit(new Callable<String>() { @Override public String call() throws InterruptedException { TimeUnit.SECONDS.sleep(5); return "hello"; } }); String result = future.get(); System.out.println(result); } }代碼中,咱們向線程池中
代碼中咱們向線程池中扔了一個任務,而後調用Future的get方法,同步阻塞等待線程池的執行結果。
這裏就要問了:get方法是如何組塞住當前線程?線程池執行完任務後又是如何喚醒線程的呢?
我們跟着源碼一步步分析,先看線程池的submit方法的實現:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
在submit方法裏,線程池將咱們提交的基於Callable實現的任務,封裝爲基於RunnableFuture實現的任務,而後將任務提交到線程池執行,並向當前線程返回RunnableFutrue。
進入newTaskFor方法,就一句話:return new FutureTask<T>(callable);
因此,我們主線程調用future的get方法就是FutureTask的get方法,線程池執行的任務對象也是FutureTask的實例。
接下來看看FutureTask的get方法的實現:
/** * @throws CancellationException {@inheritDoc} */ public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
比較簡單,就是判斷下當前任務是否執行完畢,若是執行完畢直接返回任務結果,不然進入awaitDone方法阻塞等待。
/** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //——使用CAS非阻塞算法,將線程封裝爲WaitNode,保存下來,以供後續喚醒線程時使用 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos);//——調用了LockSupport的park/parkNanos阻塞住當前線程。 } else LockSupport.park(this); } }
上邊已經說了阻塞等待任務結果的邏輯,接下來再看看線程池執行完任務,喚醒等待線程的邏輯實現。
前邊說了,我們提交的基於Callable實現的任務,已經被封裝爲FutureTask任務提交給了線程池執行,任務的執行就是FutureTask的run方法執行。以下是FutureTask的run方法:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); //——執行咱們提交的任務 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); //任務執行完後調用set方法,喚醒工做線程的工做應該就是在這裏了 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); //——進入這裏看如何實現的 } }
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //——先經過CAS非阻塞算法將全部等待的線程拿出來 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); //——而後再使用LockSupport的unpark喚醒每一個線程 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
在使用線程池的過程當中,不知道你有沒有這麼一個疑問:線程池裏沒有任務時,線程池裏的線程在幹嗎呢?
經過這篇文章《線程池的工做原理與源碼解讀》的讀者必定知道,線程會調用隊列的take方法阻塞等待新任務。那隊列的take方法是否是也跟Future的get方法實現同樣呢?我們來看看源碼實現。
以ArrayBlockingQueue爲例,take方法實現以下:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); //——使用Lock的Condition的await方法實現線程阻塞,進入裏面看看 return dequeue(); } finally { lock.unlock(); } }
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); //——發現await方法裏仍是使用LockSupport.park方法阻塞本身 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
public static void park(Object blocker) { Thread t = Thread.currentThread();//——獲取調用線程 setBlocker(t, blocker); //——設置該線程的blocker變量 UNSAFE.park(false, 0L); //——掛起線程 setBlocker(t, null); //——線程被激活後,清除blocker變量,由於通常都是在線程阻塞時才分析緣由 }
Thread類裏面有個變量volatile Object parkBlocker,用來存放park方法傳遞的block對象,也就是把block變量存放到了調用park方法的線程的成員變量裏面。
學習要知其然,還要知其因此然。接下來不妨看看LockSupport的實現。
進入LockSupport的park方法,能夠發現它是調用了Unsafe的park方法,這是一個本地native方法,只能經過openjdk的源碼看看其本地實現了。
它調用了線程的Parker類型對象的park方法,以下是Parker類的定義:
類中定義了一個int類型的_counter變量,我們上文中講靈活性的那一節說,能夠先執行unpark後執行park,
就是經過這個變量實現,看park方法的實現代碼(因爲方法比較長就不總體截圖了):
park方法會調用Atomic::xchg方法,這個方法會原子性的將_counter賦值爲0,並返回賦值前的值。若是調用park方法前,
_counter大於0,則說明以前調用過unpark方法,因此park方法直接返回。
接着往下看:
實際上Parker類用Posix的mutex,condition來實現的阻塞喚醒。若是對mutex和condition不熟,能夠簡單理解
爲mutex就是Java裏的synchronized,condition就是Object裏的wait/notify操做。
park方法裏調用pthread_mutex_trylock方法,就至關於Java線程進入Java的同步代碼塊,而後再次判斷_counter
是否大於零,若是大於零則將_counter設置爲零。最後調用pthread_mutex_unlock解鎖,至關於Java執行完退出
同步代碼塊。若是_counter不大於零,則繼續往下執行pthread_cond_wait方法,實現當前線程的阻塞。
最後再看看unpark方法的實現吧,這塊就簡單多了,直接上代碼:
圖中的1和4就至關於Java的進入synchronized和退出synchronized的加鎖解鎖操做,代碼2將_counter設置爲1,
同時判斷先前_counter的值是否小於1,即這段代碼:if(s<1)。若是不小於1,則就不會有線程被park,因此方法
直接執行完畢,不然就會執行代碼3,來喚醒被阻塞的線程。
經過閱讀LockSupport的本地實現,咱們不難發現這麼個問題:屢次調用unpark方法和調用一次unpark方法效果同樣,
由於都是直接將_counter賦值爲1,而不是加1。簡單說就是:線程A連續調用兩次LockSupport.unpark(B)方法喚醒線程B,
而後線程B調用兩次LockSupport.park()方法, 線程B依舊會被阻塞。由於兩次unpark調用效果跟一次調用同樣,
只能讓線程B的第一次調用park方法不被阻塞,第二次調用依舊會阻塞。
Java併發編程之美