正如上篇文章聊聊 JDK 阻塞隊列源碼(ReentrantLock實現)所說,隊列在咱們現實生活中隊列隨處可見,最經典的就是去銀行辦理業務,超市買東西排隊等。今天樓主要講的就是JDK中安全隊列的另外一種實現使用CAS算法實現的安全隊列。
html
在JDK中的隊列都實現了 java.util.Queue 接口,下面就是樓主要說的無鎖版本的隊列實現:java
隊列名字 | 是否加鎖 | 數據結構 |
---|---|---|
LinkedTransferQueue | 否 | 鏈表 |
ConcurrentLinkedQueue | 否 | 鏈表 |
LinkedTransferQueue 的原理就是經過使用原子變量compare and swap(簡稱「CAS」)這種不加鎖的方式來實現的進行併發控制,LinkedTransferQueue是一個無界的安全隊列,其長度能夠無限延伸,固然其帶來的問題也是顯而易見的。
node
add
方法:
算法
public boolean add(E e) { xfer(e, true, ASYNC, 0); return true; }
offer
方法:
安全
public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; }
poll
方法:
數據結構
public E poll() { return xfer(null, false, NOW, 0); }
take
方法:
併發
public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
從上面代碼中能夠看出,這些方法最終都指向了 xfer
方法,只不過傳入的不一樣的參數。app
/** * Implements all queuing methods. See above for explanation. * * @param e the item or null for take * @param haveData true if this is a put, else a take * @param how NOW, ASYNC, SYNC, or TIMED * @param nanos timeout in nanosecs, used only if mode is TIMED * @return an item if matched, else e * @throws NullPointerException if haveData mode but e is null */
從源碼的 doc 註釋中能夠知道
第一個參數,若是是 put 類型,就是實際的值,反之就是 null。
第二個參數,是否包含數據,put 類型就是 true,take 就是 false。
第三個參數,執行類型,有當即返回的NOW,有異步的ASYNC,有阻塞的SYNC, 有帶超時的 TIMED。
第四個參數,只有在 TIMED類型纔有做用。less
接下來咱們來看看 xfer
究竟是何方神聖異步
private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race // 從 head 開始 for (Node h = head, p = h; p != null;) { // find & match first node // head 的類型。 boolean isData = p.isData; // head 的數據 Object item = p.item; // item != null 有 2 種狀況,一是 put 操做, 二是 take 的 itme 被修改了(匹配成功) // (itme != null) == isData 要麼表示 p 是一個 put 操做, 要麼表示 p 是一個還沒匹配成功的 take 操做 if (item != p && (item != null) == isData) { // 若是當前操做和 head 操做相同,就沒有匹配上,結束循環,進入下面的 if 塊。 if (isData == haveData) // can't match break; // 若是操做不一樣,匹配成功, 嘗試替換 item 成功, if (p.casItem(item, e)) { // match // 更新 head for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } // 喚醒原 head 線程. LockSupport.unpark(p.waiter); return LinkedTransferQueue.<E>cast(item); } } // 找下一個 Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } // 若是這個操做不是馬上就返回的類型 if (how != NOW) { // No matches available // 且是第一次進入這裏 if (s == null) // 建立一個 node s = new Node(e, haveData); // 嘗試將 node 追加對隊列尾部,並返回他的上一個節點。 Node pred = tryAppend(s, haveData); // 若是返回的是 null, 表示不能追加到 tail 節點,由於 tail 節點的模式和當前模式相反. if (pred == null) // 重來 continue retry; // lost race vs opposite mode // 若是不是異步操做(即馬上返回結果) if (how != ASYNC) // 阻塞等待匹配值 return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }
代碼有點長,其實邏輯很簡單。就是找到 head
節點,若是 head
節點是匹配的操做,就直接賦值,若是不是,添加到隊列中。
注意:隊列中永遠只有一種類型的操做,要麼是 put
類型, 要麼是 take
類型.
與 LinkedTransferQueue
同樣,ConcurrentLinkedQueue 同樣是採用原子變量實現的併發控制,ConcurrentLinkedQueue
是一個基於連接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,當咱們添加一個元素的時候,它會添加到隊列的尾部,當咱們獲取一個元素時,它會返回隊列頭部的元素。它採用了「wait-free」算法來實現。
add
方法:
public boolean add(E e) { return offer(e); }
offer
方法:
ConcurrentLinkedQueue
是無界的,因此offer
永遠返回true,不能經過返回值來判斷是否入隊成功,
public boolean offer(E e) { // 校驗是否爲空 checkNotNull(e); //入隊前,建立一個入隊節點 final Node<E> newNode = new Node<E>(e); //循環CAS直到入隊成功。 // 一、根據tail節點定位出尾節點(last node); // 二、將新節點置爲尾節點的下一個節點, // 三、更新尾節點casTail。 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; //判斷p是否是尾節點,tail節點不必定是尾節點,判斷是否是尾節點的依據是該節點的next是否是null if (q == null) { // p is last node if (p.casNext(null, newNode)) { //設置P節點的下一個節點爲新節點,若是p的next爲null,說明p是尾節點,casNext返回true; // 若是p的next不爲null,說明有其餘線程更新過隊列的尾節點,casNext返回false。 // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) //p節點是null的head節點恰好被出隊,更新head節點時h.lazySetNext(h)把舊的head節點指向本身 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. //判斷tail節點有沒有被更新,若是沒被更新,1)p=q:p指向p.next繼續尋找尾節點; //若是被更新了,2)p=t:P賦值爲新的tail節點 p = (p != t && t != (t = tail)) ? t : q; } }
poll
方法:
public E poll() { restartFromHead: //兩層循環 for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } //隊列爲空,更新head節點 else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) //p節點是null的head節點恰好被出隊,更新head節點時h.lazySetNext(h);把舊的head節點指向本身。 //從新從head節點開始 continue restartFromHead; else p = q; //將p執行p的下一個節點 } } } //更新head節點 final void updateHead(Node<E> h, Node<E> p) { //經過CAS將head更新爲P if (h != p && casHead(h, p)) h.lazySetNext(h);//把舊的head節點指向本身 } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
remove
方法:
public boolean remove(Object o) { if (o != null) { Node<E> next, pred = null; // 循環CAS直到刪除節點 for (Node<E> p = first(); p != null; pred = p, p = next) { boolean removed = false; E item = p.item; if (item != null) { if (!o.equals(item)) { next = succ(p); continue; } // 經過CAS刪除節點 removed = p.casItem(item, null); } next = succ(p); if (pred != null && next != null) // unlink pred.casNext(p, next); if (removed) return true; } } return false; }
本文主要介紹了兩種CAS算法實現的安全隊列,然而穩定性要較高的系統中,爲了防止生產者速度過快,致使內存溢出,一般是不建議選擇無界隊列的。固然樓主水平有限,文章中難免有紕漏,望小夥伴諒解並指出,在技術的道路上一塊兒成長。