前幾天讀LinkedTransferQueue(如下簡稱ltq)的源碼,想加深下對鬆弛型雙重隊列的理解,無心中發現了這個問題:),通過仔細檢查後確認了這是個bug,存在於JDK1.7.0_40和剛發佈的JDK8中,去google和oracle官方彷佛也沒有搜索到這個問題。java
重現bug:先來重現下這個bug,因爲對併發線程的執行順序預先不能作任何假設,因此極可能根本就不存在所謂的重現錯誤的「測試用例」,或者說這個測試用例應該是某種「執行順序"。因此我一開始的作法是copy了一份ltq的源碼,經過某個地方加自旋...可是這種方法畢竟要修改源碼,後來我發現直接debug進源碼就能夠輕易重現bug了。node
LinkedTransferQueue:xfer(E e, boolean haveData, int how, long nanos) if (how != NOW) { // No matches available if (s == null) s = new Node(e, haveData); Node pred = tryAppend(s, haveData); if (pred == null) continue retry; // lost race vs opposite mode if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting
在以上06行Node pred = tryAppend(s, havaData) 斷點(我是windows下用eclipse調試);
debug如下代碼:git
public static void main(String[] args) { final BlockingQueue<Long> queue = new LinkedTransferQueue<Long>(); Runnable offerTask = new Runnable(){ public void run(){ queue.offer(8L); System.out.println("offerTask thread has gone!"); } }; Runnable takeTask = new Runnable(){ public void run(){ try { System.out.println(Thread.currentThread().getId() + " " +queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }; Runnable takeTaskInterrupted = new Runnable(){ public void run(){ Thread.currentThread().interrupt(); try { System.out.println(Thread.currentThread().getId() + " " +queue.take()); } catch (InterruptedException e) { System.out.println(e + " "+Thread.currentThread().getId()); } } }; new Thread(offerTask).start(); new Thread(takeTask).start(); new Thread(takeTaskInterrupted).start(); }
執行到斷點處以後,在Debug界面裏面有Thread-0、Thread-一、Thread-2三個線程分別指代代碼中的offerTask、takeTask、takeTaskInterrupted三者。如今執行三步:github
step 1: Resume Thread-1(沒有輸出,線程Thread-1本身掛起,等待數據)
step 2: Resume Thread-2(看到相似於 java.lang.InterruptedException 15 的輸出)
step 3: Resume Thread-0(輸出:offerTask thread has gone!)windows
offer線程已經執行完畢,而後咱們的64L呢,明明Thread-1在等待數據,數據丟失了嗎?其實不是,只不過take線程如今沒法取得offer線程提交的數據了。數據結構
若是你以爲上面的數據丟失還不是什麼大問題請在上面的示例下添加以下代碼(和你CPU核心數相同的代碼行:)併發
.............. new Thread(takeTask).start(); new Thread(takeTask).start(); new Thread(takeTask).start(); new Thread(takeTask).start();
把上面的3個step從新按順序執行一遍,建議先打開任務管理器,接着忽略斷點,讓接下來這幾個線程跑:)
CPU爆滿了吧...實際上是被這幾個線程佔據了,你去掉幾行代碼,CPU使用率會有相應的調整。
因此這個bug可能會引發數據暫時遺失和CPU爆滿, 只不過貌似發生這種狀況的機率極低。oracle
緣由:爲何會出現這個bug呢,要想了解緣由必須先深刻分析ltq內部所使用的數據結構和併發策略,ltq內部採用的是一種很是不一樣的隊列,即鬆弛型雙重隊列(Dual Queues with Slack)。eclipse
數據結構:函數
鬆弛的意思是說,它的head和tail節點相較於其餘併發列隊要求上更放鬆,構造它的目的是減小CAS操做的次數(相應的會增長next域的引用次數),舉個例子:某個瞬間tail指向的節點後面已經有6個節點了(如下圖借用源碼的註釋-_-|||沒畫過圖),而其餘併發隊列真正的尾節點最多隻能是tail的下一個節點。
* head tail
* | |
* v v
* M -> M -> U -> U -> U -> U->U->U->U->U
收縮的方式是大部分狀況下不會用tail的next來設置tail節點,而是第一次收縮N個next(N>=2),而後查看可否2個一次來收縮tail。(head相似,而且head改變一次會致使前「head"節點的next域斷裂即以下圖)
*"prehead" head tail
* | | |
* v v v
* M M-> U -> U -> U -> U->U->U->U->U
雙重是指有兩種類型相互對立的節點(Node.isData==false || true),而且我理解的每種節點都有三種狀態:
1 INIT(節點構造完成,剛進入隊列的狀態)
2 MATCHED(節點備置爲「知足」狀態,即入隊節點標識的線程成功取得或者傳遞了數據)
3 CANCELED(節點被置爲取消狀態,即入隊節點標識的線程由於超時或者中斷決定放棄等待)
(bug的緣由就是現有代碼中將二、3都當作MATCHED處理,後面會看到把3獨立出來就修復了這個問題)
併發策略:
既然使用了鬆弛的雙重隊列,那麼當take、offer等方法被調用時執行的策略也稍微不一樣。
就咱們示例中的代碼的流程來看,Thread-0、Thread-一、Thread-2幾乎同時進入到了xfer的調用,發現隊列爲空,因此都構造了本身的node但願入隊,因而三者都從tail開始加入本身的node,咱們在這裏的順序是Thread-1->Thread-2->Thread-0,由於想要入隊還要和當前的tail節點進行匹配獲得「承認」才能嘗試入隊,隊列爲空Thread-1理所固然入隊成功而且掛起了本身的線程(park)等待相對的調用來喚醒本身(unpark),而後Thread-2發現隊列末尾的node和本身是同一類型的,因而經過了測試把本身也加入了隊列,因爲自己是中斷的因此讓本身進入MATCHED狀態(bug就是這裏了,上面說過CANCEL被當作MATCHED狀態處理),接着咱們提交數據的Thread-0來了,發現末尾節點的類型雖然對立但倒是MATCHED狀態(假如不爲MATCHED會有退回再從head來探測一次的機會),因此認爲隊列已經爲空,前面的調用已經被匹配完了,而後把本身的node入隊,這樣就造成了以下所示的場景:
* Thread-1 Thread-2 Thread-0
* | | |
* v v v
* REQUEST -> MATCHED -> DATA
好了, 如今Thread-3來了,先探測尾部發現Thread-0的node是類型相反的,因而退回從頭部開始從新探測,可是又發現Thread-1的node的類型是相同的,因而再次去探測尾部看看可否入隊.......結果形成CPU是停不下來的。
修復:
如上面所說,錯誤的本質在於當尾部的節點是CANCELED(取消)狀態時不能做爲被匹配完成的MATCHED狀態處理,應該讓後來者回退到head去從新測試一次因此重點是對源碼作出以下修改(修改放在註釋中):
static final class Node { final boolean isData; // false if this is a request node volatile Object item; // initially non-null if isData; CASed to match volatile Node next; volatile Thread waiter; // null until waiting /* static final Object CANCEL = new Object(); final void forgetWaiter(){ UNSAFE.putObject(this, waiterOffset, null); } final boolean isCanceled(){ return item == CANCEL; } */
在Node節點代碼中加入標識取消的對象CANCEL。
private E xfer(E e, boolean haveData, int how, long nanos) { if (item != p && (item != null) == isData /*&& item!=Node.CANCEL*/) { // unmatched if (isData == haveData) // can't match
在xfer函數中添加對於爲狀態爲取消的判斷。
private E xfer(E e, boolean haveData, int how, long nanos) { Node pred = tryAppend(/*s,*/ haveData); ..... } private Node tryAppend(Node s, boolean haveData) { else if (p.cannotPrecede(/*s, */haveData)) else { /* if(p.isCanceled()) p.forgetContents();*/ if (p != t) { // update if slack now >= 2
添加對於前置節點爲取消狀態時當前節點的入隊策略
final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } final boolean cannotPrecede(Node node, boolean haveData) { boolean d = isData; if(d != haveData){ Object x = item; if(x != this && (x!=null) == d && x!= Node.CANCEL) return true; if(item == CANCEL){ if(node.next != this){ node.next = this; return true; } this.forgetContents(); } } node.next = null; return false; }
這一步是關鍵, 當咱們入隊時發現前置節點是類型對立而且取消狀態時,咱們就須要多一次的回退探測,因此借用了一下next域來標識這個CANCEL節點,下次通過時或者能夠確認它能夠當作MATCHED處理(它前面沒有INIT節點)或者已經有別的節點粘接在它後面,咱們就進而處理那個節點,總之當咱們老是可以獲得正確的行爲。
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, /*Node.CANCEL*/)) { // cancel unsplice(pred, s); return e; }
這一處關鍵點把item的值從原來的s自己修改成咱們新增的CANCEL。
額 代碼好亂,關於這個bug定位應該沒問題,後面的緣由不少方面都沒講,剩下的還有不少處大大小小的修改=_=,整個修改以後的LinkedTransferQueue在github上,你們有興趣的話能夠參考下,已經經過了 JSR166測試套件。