在使用cachedThreadPool的時候,沒有對原理去很好的理解,因此致使使用起來有些不放心,主要是對synchronousQueue的原理不太瞭解,因此有此文的分析java
本文從兩個方面分析synchronousQueue:node
首先說說api吧,關於隊列有幾套api,核心是下面的兩套:算法
take() & put() //這是阻塞的,會阻塞操做線程
poll() & offer() //這是非阻塞的(在不設置超時時間的前提下),當操做不能達成的時候會立馬返回boolean複製代碼
synchronousQueue是一個沒有數據緩衝的阻塞隊列,生產者線程對其的插入操做put()必須等待消費者的移除操做take(),反過來也同樣。api
可是poll()和offer()就不會阻塞,舉例來講就是offer的時候若是有消費者在等待那麼就會立馬知足返回true,若是沒有就會返回false,不會等待消費者到來。數據結構
下面咱們分析一下cachedThreadPool的使用流程,經過這個過程咱們來了解synchronousQueue的使用方式:先看代碼併發
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//1
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}複製代碼
根據上面咱們介紹的synchronousQueue的隊列語義,咱們其實能夠很容易的經過鎖或者信號量等一系列的同步機制來實現一個synchronousQueue的結構,可是咱們知道有鎖的通常效率都不會過高,因此java爲咱們提供了下面一種無鎖的算法。框架
無鎖在java裏面通常就是cas和spin來實現的!具體會在以後介紹java併發包的時候來分析TODO。下面的分析的核心在於:cas和spin,通常把cas和spin能夠組合起來使用,spin就是不斷循環重試cas操做,確保操做可以成功。這些就不詳細介紹,網上有不少相關文章!這也是java併發的基礎!ide
稍微跟蹤一下代碼,就會發現synchronousQueue內部是經過Transferer來實現的,具體分爲兩個Transferer,分別是TransferStack和TransferQueue,二者差異在因而否公平:下面咱們只分析TransferQueue的實現。this
/** Node class for TransferQueue. */
static final class QNode { //1
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
boolean casNext(QNode cmp, QNode val) {//2
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
boolean casItem(Object cmp, Object val) {//2
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
}複製代碼
以後是transfer操做idea
E transfer(E e, boolean timed, long nanos) {//1
QNode s = null;
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null)
continue;
if (h == t || t.isData == isData) { //2
QNode tn = t.next;
if (t != tail)
continue;
if (tn != null) {
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0)
return null;
if (s == null)
s = new QNode(e, isData);//3
if (!t.casNext(null, s)) //4
continue;
advanceTail(t, s); // 5
Object x = awaitFulfill(s, e, timed, nanos);//6
if (x == s) {
clean(t, s);
return null;
}
if (!s.isOffList()) {
advanceHead(t, s);
if (x != null)
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { //7
QNode m = h.next; //8
if (t != tail || m == null || h != head)
continue;
Object x = m.item;
if (isData == (x != null) ||
x == m ||
!m.casItem(x, e)) { // 9
advanceHead(h, m);
continue;
}
advanceHead(h, m); // 10
LockSupport.unpark(m.waiter);//11
return (x != null) ? (E)x : e;
}
}
}複製代碼
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {//6.1
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
if (x != e)//6.2
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);//6.3
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}複製代碼
在上面的代碼中,我把重要的地方分了11步,分別進行解釋:
首先說一下大體的操做。在transfer中,把操做分爲兩種,一種就是入隊put,一種是出隊take,入隊的時候會建立data節點,值爲data。出隊的時候會建立一個request節點,值爲null。
put和take操做都會調用該方法,區別在於,put操做的時候e值爲數據data,take操做的時候e值爲null
若是h==t也就是隊列爲空,或者當前隊列尾部的數據類型和調用該方法的數據類型一致:好比當前隊列爲空,第一次來了一個入隊請求,這時候隊列就會建立出一個data節點,若是第二次又來了一個入隊請求(和第一次也就是隊列尾部的數據類型一致,都是入隊請求),這時候隊列會建立出第二個data節點,並造成一個鏈表。同理,若是剛開始來了request請求,也會入隊,以後若是繼續來了一個reqeust請求,也會繼續入隊!
知足2的條件,就會進入3,中間會有一些一致性檢查這也是必須的,避免產生併發衝突。3會建立出一個節點,根據e值的不一樣,多是data節點或者request節點。
把3中建立的節點經過cas方式設置到隊列尾部去。
把tail經過cas方式修改爲3中新創建的s節點
調用方法awaitFulfill進行等待,若是3中建立的是data節點,那麼就會等待來一個reqeust節點,反之亦然!