public static void main(String[] args)throws Exception { Thread t1 = new Thread(()->{ int sum = 0; for(int i=0;i<10;i++){ sum+=i; } LockSupport.park(); System.out.println(sum); }); t1.start(); Thread.sleep(1000);//等待t1中的求和計算完成,當咱們註釋此行時,則會致使unpark方法先於park方法。可是程序依然可以正確執行。 LockSupport.unpark(t1); }
public static void main(String[] args)throws Exception { ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1000); ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1000, TimeUnit.SECONDS,queue); Future<String> future = poolExecutor.submit(()->{//此任務實現了Callable接口 TimeUnit.SECONDS.sleep(5); return "hello"; }); String result = future.get();//同步阻塞等待線程池的執行結果。 System.out.println(result); }
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task);//查看源碼實際上是建立了一個FutureTask對象 execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
//判斷當前任務是否執行完畢,若是執行完畢直接返回任務結果,不然進入awaitDone方法阻塞等待。 public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos);//調用parkNanos方法阻塞線程 } else LockSupport.park(this);//調用park方法組塞住當前線程。 } }
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //c.call()就是執行咱們提交的任務,任務執行完後調用了set方法,進入set方法發現set方法調用了finishCompletion方法 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t);//經過cas操做將全部等待的線程拿出來,而後便使用LockSupport的unpark喚醒每一個線程。 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
您有沒有這樣的一個疑問,當線程池裏沒有任務時,線程池裏的線程在幹嗎呢?若是讀過線程池源碼(https://my.oschina.net/cqqcqqok/blog/2049249 )那麼你必定知道,在線程getTask()獲取任務的時候,若是沒有新的任務那麼線程會調用隊列的take方法阻塞等待新任務。那隊列的take方法是否是也跟Future的get方法實現同樣呢?咱們以ArrayBlockingQueue爲例,來看一下源碼實現。ui
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//此處調用了Condition的await方法 return dequeue(); } finally { lock.unlock(); } }
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this);//調用了LockSupport的park方法 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
LockSupport實際上是一個簡單的代理類,它裏面的代碼都是使用Unsafe類裏面的native方法,這裏能夠簡單看看sun.misc.Unsafe 本文主要學習裏面的park和unpark方法。
//isAbsolute表明傳入的time是絕對時間仍是相對時間 public native void park(boolean isAbsolute, long time); public native void unpark(Object thread);
public static void park(Object blocker) { //獲取當前運行線程 Thread t = Thread.currentThread(); //設置t中的parkBlockerOffset(掛起線程對象的偏移地址)的值爲blocker //這個字段能夠理解爲專門爲LockSupport而設計的,它被用來記錄線程是被誰堵塞的,當程序出現問題時候,經過線程監控分析工具能夠找出問題所在。註釋說該對象被LockSupport的getBlocker和setBlocker來獲取和設置,且都是經過地址偏移量方式獲取和修改的。因爲這個變量LockSupport提供了park(Object parkblocker)方法。 setBlocker(t, blocker); //阻塞線程 UNSAFE.park(false, 0L); //線程被釋放,則將parkBlockerOffset設爲null setBlocker(t, null); } public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); }
首先,包含park和unpark的頭函數在:http://hg.openjdk.java.net/jdk8u/jdk8u40/hotspot/file/68577993c7db/src/share/vm/runtime/park.hpp 而其具體實現函數則在:http://hg.openjdk.java.net/jdk8u/jdk8u40/hotspot/file/95e9083cf4a7/src/os/solaris/vm/os_solaris.cpp
void Parker::park(bool isAbsolute, jlong time) { //_counter字段,就是用來記錄「許可」的。 //先嚐試可否直接拿到「許可」,即_counter>0時,若是成功,則把_counter設置爲0 if (_counter > 0) { _counter = 0 ; OrderAccess::fence(); //直接返回 return ; } //獲取當前線程 Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; //若是中途已是interrupt了,那麼馬上返回,不阻塞 if (Thread::is_interrupted(thread, false)) { return; } //記錄當前絕對時間戳 timespec absTime; //若是park的超時時間已到,則返回 if (time < 0) { // don't wait at all return; } if (time > 0) { //更換時間戳 unpackTime(&absTime, isAbsolute, time); } //進入安全點,利用該thread構造一個ThreadBlockInVM。 ThreadBlockInVM tbivm(jt); if (Thread::is_interrupted(thread, false) || os::Solaris::mutex_trylock(_mutex) != 0) { return; } //記錄等待狀態 int status ; //中途再次給予了許可,則直接返回不等帶。 if (_counter > 0) { // no wait needed _counter = 0; status = os::Solaris::mutex_unlock(_mutex); assert (status == 0, "invariant") ; OrderAccess::fence(); return; } #ifdef ASSERT sigset_t oldsigs; sigset_t* allowdebug_blocked = os::Solaris::allowdebug_blocked_signals(); thr_sigsetmask(SIG_BLOCK, allowdebug_blocked, &oldsigs); #endif OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); jt->set_suspend_equivalent(); #if defined(__sparc) && defined(COMPILER2) if (ClearFPUAtPark) { _mark_fpu_nosave() ; } #endif if (time == 0) { //等待時間爲0時,直接等待 status = os::Solaris::cond_wait (_cond, _mutex) ; } else { //time不爲0,則繼續等待。 status = os::Solaris::cond_timedwait (_cond, _mutex, &absTime); } assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait"); #ifdef ASSERT thr_sigsetmask(SIG_SETMASK, &oldsigs, NULL); #endif _counter = 0 ; status = os::Solaris::mutex_unlock(_mutex); assert_status(status == 0, status, "mutex_unlock") ; if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } OrderAccess::fence(); }
void Parker::unpark() { //定義兩個變量,staus用於判斷是否獲取鎖 int s, status ; //獲取鎖 status = os::Solaris::mutex_lock (_mutex) ; //判斷是否成功 assert (status == 0, "invariant") ; //存儲原先變量_counter s = _counter; //把_counter設爲1 _counter = 1; //釋放鎖 status = os::Solaris::mutex_unlock (_mutex) ; assert (status == 0, "invariant") ; if (s < 1) { //若是原先_counter信號量小於1,即爲0,則進行signal操做,喚醒操做 status = os::Solaris::cond_signal (_cond) ; assert (status == 0, "invariant") ; } }