一 , 單個task 入到pool的過程 .node
1public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 int c = ctl.get(); 5 if (workerCountOf(c) < corePoolSize) { // 1 6 if (addWorker(command, true)) 7 return; 8 c = ctl.get(); 9 } 10 if (isRunning(c) && workQueue.offer(command)) { // 2 11 int recheck = ctl.get(); 12 if (! isRunning(recheck) && remove(command)) 13 reject(command); 14 else if (workerCountOf(recheck) == 0) 15 addWorker(null, false); 16 } 17 else if (!addWorker(command, false)) // 3 18 reject(command); 19}
worker數 還未到達 核心線程數(corePoolSize),不入隊列,直接使用新線程執行task數組
不然,task數已經達到corePoolSize,將task放到隊列中 .函數
不然,直接嘗試使用新線程執行,失敗的話,執行溢出策略.學習
1private Runnable getTask() { 2 boolean timedOut = false; // Did the last poll() time out? 3 for (;;) { 4 int c = ctl.get(); 5 int rs = runStateOf(c); 6 7 // Check if queue empty only if necessary. 8 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 1 9 decrementWorkerCount(); 10 return null; 11 } 12 int wc = workerCountOf(c); 13 // Are workers subject to culling? 14 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 15 if ((wc > maximumPoolSize || (timed && timedOut)) 16 && (wc > 1 || workQueue.isEmpty())) { // 1 17 if (compareAndDecrementWorkerCount(c)) 18 return null; 19 continue; 20 } 21 try { 22 Runnable r = timed ? 23 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 2 24 workQueue.take(); // 3 25 if (r != null) 26 return r; 27 timedOut = true; 28 } catch (InterruptedException retry) { 29 timedOut = false; 30 } 31 } 32 }
以默認的LinkedBlockingQueue 爲例,瞭解一下阻塞隊列.this
屬性:spa
1//節點比較簡單,只是一個單鏈 2static class Node<E> { 3 E item; 4 Node<E> next; 5 Node(E x) { item = x; } 6 }
做爲阻塞隊列,有如下鎖線程
1 /** Lock held by take, poll, etc */ 2 private final ReentrantLock takeLock = new ReentrantLock(); 3 4 /** Wait queue for waiting takes */ 5 private final Condition notEmpty = takeLock.newCondition(); 6 7 /** Lock held by put, offer, etc */ 8 private final ReentrantLock putLock = new ReentrantLock(); 9 10 /** Wait queue for waiting puts */ 11 private final Condition notFull = putLock.newCondition();
隊列的進出用了不一樣的鎖,隊列的進出是能夠同時進行的.code
put()方法對象
1//put 2public void put(E e) throws InterruptedException { 3 if (e == null) throw new NullPointerException(); 4 // Note: put/take/etc 方法都使用本地變量操做 5 // 保持count計數爲負數,表示失敗 . 除非set操做 6 int c = -1; 7 Node<E> node = new Node<E>(e); 8 final ReentrantLock putLock = this.putLock; 9 final AtomicInteger count = this.count; 10 //可被 Interrupt 的鎖 11 putLock.lockInterruptibly(); 12 try { 13 //當已經達到最大容量,那麼阻塞線程,並等待. 14 //count是原子的,因此不用lock 保護 15 // 16 while (count.get() == capacity) { 17 notFull.await(); 18 } 19 //將節點 入隊列 20 enqueue(node); 21 //count數增長 22 c = count.getAndIncrement(); 23 //若是還沒到達最大容量, 24 if (c + 1 < capacity) 25 //那麼喚醒其餘(一個)put操做 26 notFull.signal(); 27 } finally { 28 //put解鎖 29 putLock.unlock(); 30 } 31 if (c == 0) 32 signalNotEmpty(); 33}
offer(E) 方法 , ThreadPoolExecutor內,用的是這個方法隊列
1public boolean offer(E e) { 2 if (e == null) throw new NullPointerException(); 3 final AtomicInteger count = this.count; 4 //若是已經到達最大容量,直接退出 5 if (count.get() == capacity) 6 return false; 7 int c = -1; 8 Node<E> node = new Node<E>(e); 9 final ReentrantLock putLock = this.putLock; 10 //加不可被interrupted的鎖 11 putLock.lock(); 12 try { 13 //還未到達最大容量 14 if (count.get() < capacity) { 15 //入隊列 16 enqueue(node); 17 //count增長 18 c = count.getAndIncrement(); 19 if (c + 1 < capacity) 20 notFull.signal(); 21 } 22 } finally { 23 putLock.unlock(); 24 } 25 // 還未理解 這個條件的場景 26 if (c == 0) 27 //喚醒其餘take鎖 28 signalNotEmpty(); 29 //只要隊列有值 ,那麼就是加入成功 30 return c >= 0; 31}
offer(E e, long timeout, TimeUnit unit) 方法
相對於offer(E)方法,這裏加了個超時
1//當達到最大容量 2while (count.get() == capacity) { 3 //若是等待結束,仍是沒有可用容量(仍是最大容量) 4 if (nanos <= 0) 5 //那麼結束入隊 6 return false; 7 //等待timeout的時間 8 nanos = notFull.awaitNanos(nanos); 9} 10//等待結束,有可用容量,那麼入隊列操做 11enqueue(new Node<E>(e));
poll(long timeout, TimeUnit unit)方法,ThreadPoolExecutor類keepAliveTime,主要就是使用這個方法
1public E poll(long timeout, TimeUnit unit) throws InterruptedException { 2 E x = null; 3 // 假設count爲負數,沒有數據 4 int c = -1; 5 //取timeout的毫秒時間 6 long nanos = unit.toNanos(timeout); 7 final AtomicInteger count = this.count; 8 final ReentrantLock takeLock = this.takeLock; 9 //take鎖,可Interrupt的鎖 10 takeLock.lockInterruptibly(); 11 try { 12 //當count==0時,已經沒有元素了 13 while (count.get() == 0) { 14 //等待結束後,依然沒有元素 15 if (nanos <= 0) 16 //結束,並返回null對象 17 return null; 18 //等待timeout的時間,線程狀態: TIMED_WAIT 19 nanos = notEmpty.awaitNanos(nanos); 20 } 21 //等待結束後,隊列中有元素了. 22 //取隊列的最頂元素 23 x = dequeue(); 24 //count - 1 ,原子操做 25 c = count.getAndDecrement(); 26 if (c > 1) 27 //減完後,隊列中依然有元素,那麼叫醒其餘take 等待鎖 28 notEmpty.signal(); 29 } finally { 30 takeLock.unlock(); 31 } 32 // 這個未理解 33 if (c == capacity) 34 //叫醒其餘全部 put鎖,有空間了,能夠放元素了. 35 signalNotFull(); 36 return x; 37}
take() 方法,ThreadPoolExecutor類,保活的線程,在getTask()時,調用此方法
1public E take() throws InterruptedException { 2 E x; 3 //假設失敗,count爲負數 4 int c = -1; 5 final AtomicInteger count = this.count; 6 final ReentrantLock takeLock = this.takeLock; 7 //可被Interrupt的take鎖,當被Interrupt後,拋出異常 8 takeLock.lockInterruptibly(); 9 try { 10 //當隊列沒有元素後 11 while (count.get() == 0) { 12 //線程等待狀態,除非被其餘線程喚醒 13 //處於永久等待狀態 14 notEmpty.await(); 15 } 16 //取隊列頭的元素 17 x = dequeue(); 18 c = count.getAndDecrement(); 19 if (c > 1) 20 notEmpty.signal(); 21 } finally { 22 takeLock.unlock(); 23 } 24 if (c == capacity) 25 //叫醒其餘全部的put鎖 26 signalNotFull(); 27 return x; 28}
阻塞隊列的阻塞機制,下篇再瞭解下.
// lock已經快分不清了,只憑理論知識 已經不能把這個LinkedBlockingQueue理的明白了.得先學習下Lock了
有如下問題: