(1)ConcurrentLinkedQueue是阻塞隊列嗎?java
(2)ConcurrentLinkedQueue如何保證併發安全?node
(3)ConcurrentLinkedQueue能用於線程池嗎?安全
ConcurrentLinkedQueue只實現了Queue接口,並無實現BlockingQueue接口,因此它不是阻塞隊列,也不能用於線程池中,可是它是線程安全的,可用於多線程環境中。多線程
那麼,它的線程安全又是如何實現的呢?讓咱們一塊兒來瞧一瞧。併發
// 鏈表頭節點 private transient volatile Node<E> head; // 鏈表尾節點 private transient volatile Node<E> tail;
就這兩個主要屬性,一個頭節點,一個尾節點。源碼分析
private static class Node<E> { volatile E item; volatile Node<E> next; }
典型的單鏈表結構,很是純粹。線程
public ConcurrentLinkedQueue() { // 初始化頭尾節點 head = tail = new Node<E>(null); } public ConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; // 遍歷c,並把它元素所有添加到單鏈表中 for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); t = newNode; } } if (h == null) h = t = new Node<E>(null); head = h; tail = t; }
這兩個構造方法也很簡單,能夠看到這是一個無界的單鏈表實現的隊列。rest
由於它不是阻塞隊列,因此只有兩個入隊的方法,add(e)和offer(e)。code
由於是無界隊列,因此add(e)方法也不用拋出異常了。接口
public boolean add(E e) { return offer(e); } public boolean offer(E e) { // 不能添加空元素 checkNotNull(e); // 新節點 final Node<E> newNode = new Node<E>(e); // 入隊到鏈表尾 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // 若是沒有next,說明到鏈表尾部了,就入隊 if (q == null) { // CAS更新p的next爲新節點 // 若是成功了,就返回true // 若是不成功就從新取next從新嘗試 if (p.casNext(null, newNode)) { // 若是p不等於t,說明有其它線程先一步更新tail // 也就不會走到q==null這個分支了 // p取到的多是t後面的值 // 把tail原子更新爲新節點 if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. // 返回入隊成功 return true; } } else if (p == q) // 若是p的next等於p,說明p已經被刪除了(已經出隊了) // 從新設置p的值 p = (t != (t = tail)) ? t : head; else // t後面還有值,從新設置p的值 p = (p != t && t != (t = tail)) ? t : q; } }
入隊整個流程仍是比較清晰的,這裏有個前提是出隊時會把出隊的那個節點的next設置爲節點自己。
(1)定位到鏈表尾部,嘗試把新節點放到後面;
(2)若是尾部變化了,則從新獲取尾部,再重試;
由於它不是阻塞隊列,因此只有兩個出隊的方法,remove()和poll()。
public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } public E poll() { restartFromHead: for (;;) { // 嘗試彈出鏈表的頭節點 for (Node<E> h = head, p = h, q;;) { E item = p.item; // 若是節點的值不爲空,而且將其更新爲null成功了 if (item != null && p.casItem(item, null)) { // 若是頭節點變了,則不會走到這個分支 // 會先走下面的分支拿到新的頭節點 // 這時候p就不等於h了,就更新頭節點 // 在updateHead()中會把head更新爲新節點 // 並讓head的next指向其本身 if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); // 上面的casItem()成功,就能夠返回出隊的元素了 return item; } // 下面三個分支說明頭節點變了 // 且p的item確定爲null else if ((q = p.next) == null) { // 若是p的next爲空,說明隊列中沒有元素了 // 更新h爲p,也就是空元素的節點 updateHead(h, p); // 返回null return null; } else if (p == q) // 若是p等於p的next,說明p已經出隊了,重試 continue restartFromHead; else // 將p設置爲p的next p = q; } } } // 更新頭節點的方法 final void updateHead(Node<E> h, Node<E> p) { // 原子更新h爲p成功後,延遲更新h的next爲它本身 // 這裏用延遲更新是安全的,由於head節點已經變了 // 只要入隊出隊的時候檢查head有沒有變化就好了,跟它的next關係不大 if (h != p && casHead(h, p)) h.lazySetNext(h); }
出隊的整個邏輯也是比較清晰的:
(1)定位到頭節點,嘗試更新其值爲null;
(2)若是成功了,就成功出隊;
(3)若是失敗或者頭節點變化了,就從新尋找頭節點,並重試;
(4)整個出隊過程沒有一點阻塞相關的代碼,因此出隊的時候不會阻塞線程,沒找到元素就返回null;
(1)ConcurrentLinkedQueue不是阻塞隊列;
(2)ConcurrentLinkedQueue不能用在線程池中;
(3)ConcurrentLinkedQueue使用(CAS+自旋)更新頭尾節點控制出隊入隊操做;
ConcurrentLinkedQueue與LinkedBlockingQueue對比?
(1)二者都是線程安全的隊列;
(2)二者均可以實現取元素時隊列爲空直接返回null,後者的poll()方法能夠實現此功能;
(3)前者全程無鎖,後者所有都是使用重入鎖控制的;
(4)前者效率較高,後者效率較低;
(5)前者沒法實現若是隊列爲空等待元素到來的操做;
(6)前者是非阻塞隊列,後者是阻塞隊列;
(7)前者沒法用在線程池中,後者能夠;
歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。