http://m.blog.csdn.net/blog/luoyuyou/30256497java
背景:node
一個BlockingQueue的是一個這樣的隊列,每一個插入操做都必須等待另外一個刪除操做,反過來也同樣。一個同步隊列沒有內部容量這個概念。你不能使用peek操做,由於一個元素僅在你試着刪除它的時候纔可以被取得。你不能插入一個元素(任何方法),直到另外一個線程試着刪除它。你不能迭代它們,由於沒有東西能夠被迭代。queue的頭元素head是第一個進入隊列中的元素,而且插入線程正在爲它等待。若是隊列中沒有像以上的元素存在,那麼調用poll的方法會返回null。對於Collection的其餘方法(好比contains),SynchronousQueue表現得像一個空的集合。它不容許null入隊。
算法
這個隊列相似於CSP和Ada中使用的會合信道。它們適合於切換的設計,好比一個線程中的對象必須同步等待另外一個線程中運行的對象從而傳遞一些信息/事件/任務。數據結構
這個類支持可選的公平策略從而制訂生產者和等待者的等待順序。默認狀況下,這個順序是沒有保證的,使用true能夠確保隊列是FIFO的。app
這個類以及它的迭代器實現了某些Collection和Iterator中的方法。less
算法:ide
這個算法實現了雙重棧和雙重隊列算法。oop
(LIfo)棧用做非公平模式,(Fifo)隊列用於公平模式。這二者的性能類似。Fifo常常能在競爭狀況下提供更高的吞吐量,可是Lifo可以在通常應用中維持更高的線程局部性。性能
雙重隊列(以及類似的棧)在任什麼時候刻都是持有「數據」--item經過put操做提供或者「請求」--slo經過take操做提供,或者爲空。一個調用試着「知足」(一個請求的調用獲得數據或者一個數據的調用匹配了請求)結果是出隊了一個模式互補的節點。最有趣的的地方在於任何操做都可以明確當前隊列處於哪一種模式,以及表現得好像不須要鎖。
this
隊列和棧都擴展了抽象的Transferer接口,它定義了惟一一個transfer方法用於put或者take。這些方法統一在一個方法下是由於在雙重數據結構中,put和take方法是對稱的兩種方法,因此幾乎全部代碼能夠被組合。結果transfer方法是比較長的,可是這樣相對於把他們分紅幾乎重複的幾部分代碼仍是更好的。
這個隊列和棧共享了不少類似的概念可是不多的具體細節。爲了簡單性,他們保持不一樣從而在後續能夠分開演化。
這個同步隊列的算法不一樣於之前的算法,包括消除的機制。主要的不一樣在於:
/** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; if (t != tail) // inconsistent read continue; if (tn != null) { // lagging tail advanceTail(t, tn); continue; } if (timed && nanos <= 0) // can't wait return null; if (s == null) s = new QNode(e, isData); if (!t.casNext(null, s)) // failed to link in continue; advanceTail(t, s); // swing tail and wait Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; } else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } }
這裏的基本算法是循環試着執行下列行動之一:
接着咱們來看阻塞操做awaitFulfill:
/** * Spins/blocks until node s is fulfilled. * * @param s the waiting node * @param e the comparison value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s if cancelled */ Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; if (x != e) return x; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } if (spins > 0) --spins; else if (s.waiter == null) s.waiter = w; else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
這裏的s是根據調用構造的節點,e爲傳遞的數據(null是請求,不然數據),timed爲限時標示,nanos時間量(納秒):
咱們最後看transfer方法中的清理操做:
void clean(QNode pred, QNode s) { s.waiter = null; // forget thread /* * At any given time, exactly one node on list cannot be * deleted -- the last inserted node. To accommodate this, * if we cannot delete s, we save its predecessor as * "cleanMe", deleting the previously saved version * first. At least one of node s or the node previously * saved can always be deleted, so this always terminates. */ while (pred.next == s) { // Return early if already unlinked QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; // Ensure consistent read for tail if (t == h) return; QNode tn = t.next; if (t != tail) continue; if (tn != null) { advanceTail(t, tn); continue; } if (s != t) { // If not tail, try to unsplice QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) return; } QNode dp = cleanMe; if (dp != null) { // Try unlinking previous cancelled node QNode d = dp.next; QNode dn; if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); if (dp == pred) return; // s is already saved node } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } }
在任一時刻,只有一個鏈表中的節點不能被刪除---最後一個插入的節點。爲了遷就這個原則,若是咱們沒法刪除s,那麼咱們就保存它的前驅節點爲「cleanMe」,首先刪除以前保存的版本。因此至少s或者以前保存的節點可以被刪除,因此最後老是可以被刪除!
詳情以下:
if (dp == pred)這一行在我看來是不必的,由於永遠不可能出現。
咱們接着分析stack:
stack
/** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; if (h == null || h.mode == mode) { // empty or same-mode if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
這裏的Node實際上擁有三種狀態:REQUEST/DATA/FULFILLING,基本算法是循環試着執行下列3種行動之一:
/** * Spins/blocks until node s is matched by a fulfill operation. * * @param s the waiting node * @param timed true if timed wait * @param nanos timeout value * @return matched node, or s if cancelled */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(); SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
這個操做事實上和隊列版本中的相似,首先來解釋下注釋中的內容:當一個節點/線程試着去阻塞,它會在設置waiter域以後至少檢查一次狀態,而後纔會調用parking(阻塞),這樣子能夠經過waiter從而和它的知足者協做從而確保不會丟失信號。若是當前調用的幾點位於棧頂,那麼在park以前會首先嚐試自旋,這樣能夠在生產者和消費者很是接近時避免阻塞。可是這個只在多核處理器下才會有用。從代碼中的檢查狀況能夠看出,在優先級上,中斷狀態--->正式返回---->超時。(因此最後一個檢查是用來探測超時的)除了非限時的同步隊列。{poll/offer}方法不會檢查中斷以及等待過久,因此對於中斷和超時的判斷被放置於transfer方法中,這樣要好於調用awaitFulfill。詳情以下(與隊列版本中相似):取得當前的結束時間,當前線程,以及自旋次數。而後進入循環。首先判斷是否中斷,判斷限時版本下的時間流逝,判斷自旋,以及根據當前節點s所處的位置來設置自旋次數。設置線程對象(用於喚醒)。最後根據是否限時來阻塞當前線程,限時版本下會根據閾值來判斷是否須要阻塞。最後咱們來看處理中斷和超時狀況下的清理操做clean:
void clean(SNode s) { s.item = null; // forget item s.waiter = null; // forget thread /* * At worst we may need to traverse entire stack to unlink * s. If there are multiple concurrent calls to clean, we * might not see s if another thread has already removed * it. But we can stop when we see any node known to * follow s. We use s.next unless it too is cancelled, in * which case we try the node one past. We don't check any * further because we don't want to doubly traverse just to * find sentinel. */ SNode past = s.next; if (past != null && past.isCancelled()) past = past.next; // Absorb cancelled nodes at head SNode p; while ((p = head) != null && p != past && p.isCancelled()) casHead(p, p.next); // Unsplice embedded nodes while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled()) p.casNext(n, n.next); else p = n; } }