Lock鎖源碼分析java
HashMap源碼分析-jdk1.6和jdk1.8的區別算法
ArrayList源碼分析源碼分析
ConcurrentLinkedQueue是一個線程安全的隊列,它採用的是 CAS 算法來進行實現,也就是說它是非阻塞的;隊列中的元素按照 FIFO(先進先出)的原則對元素進行排列,此外,它是一個無界隊列;添加元素的時候,在鏈表的尾部進行添加,獲取元素的時候,從鏈表的頭部獲取。它內部採用的單向鏈表的形式來表示,鏈表的節點是定義在ConcurrentLinkedQueue的一個內部類。性能
ConcurrentLinkedQueue 的類圖以下所示:this
能夠看到 ConcurrentLinkedQueue 實現了 Queue 接口和實現了繼承了 AbstractQueue 類,而 Itr 和 Node則是它的一個內部類;spa
Queue 接口只是定義了一些隊列的公共方法,以下:.net
public interface Queue<E> extends Collection<E> { // 添加元素 boolean add(E e); // 添加元素 boolean offer(E e); // 刪除元素 E remove(); // 刪除並返回第一個元素,若是隊列爲空,則返回 null E poll(); // 返回第一個元素,若是不存在,則拋出NoSuchElementException異常 E element(); // 返回第一個元素,但不刪除,若是隊列爲空,則返回 null E peek(); }
AbstractQueue 類也繼承了 Queue接口,提供了某些方法的實現,以下所示:線程
public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } ............................... }
接下來看看 ConcurrentLinkedQueue 的一個實現過程:
首先看一下隊列中鏈表節點的定義,鏈表中的節點使用一個 Node 內部類來表示:
private static class Node<E> { // 節點中的元素 volatile E item; // 下一個節點,沒有上一個節點,表示它是一個單向鏈表的形式 volatile Node<E> next; // 構造一個節點 Node(E item) { UNSAFE.putObject(this, itemOffset, item); } // 使用 CAS 的方式設置節點的元素 boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // 設置下一個節點 void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } // 採用 CAS 的方式設置下一個節點 boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe 類的一些初始化 }
能夠看到 Node 類的定義比較簡單,值得注意的地方是 E item 元素和 Node next 節點都使用了 volatile 來修飾,這說明了元素或某個節點被一個線程修改了以後,其餘的線程是立馬看到修改後的值的。
接下來看一下 ConcurrentLinkedQueue 類中的屬性和方法:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { // 頭節點, private transient volatile Node<E> head; // 尾節點,尾節點不必定是鏈表的最後一個節點 private transient volatile Node<E> tail; // 構造 public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); } ...................... }
以上能夠看到,頭節點 head 和尾節點 tail 都被 volatile 修飾,節點被一個線程修改了以後,是會把修改的最新的值刷新到主內存中去,當其餘線程去讀取該值的時候,會中主內存中獲取最新的值,也就是一個線程修改了以後,對其餘線程是當即可見的。
當使用空的構造其是實例化一個對象的時候,會建立一個節點,節點的值爲 null(添加的時候,是不能爲null的),並把頭節點和尾節點都指向該節點,以下所示:
以後向鏈表中添加元素,添加元素的時候,是在鏈表的尾部進行添加,添加元素有兩個方法 add() 和 offer(),add() 會調用 offer() 進行添加,這兩個方法永遠都會返回 true,因此不要使用 true | false 來判斷是否添加成功;
public boolean add(E e) { return offer(e); }
public boolean offer(E e) { // 判空,爲空則拋出空指針異常 checkNotNull(e); // 建立要添加的節點 final Node<E> newNode = new Node<E>(e); // 無限循環,入隊不成功,則反覆入隊 // t 表示 tail 節點 // p 表示鏈表的尾節點,默認等於 tail 節點 for (Node<E> t = tail, p = t;;) { // q 爲尾節點的下一個節點 Node<E> q = p.next; // 若是尾節點的下一個節點爲空,則表示 p 爲尾節點 if (q == null) { // CAS 設置尾節點的下一個節點爲新添加的節點,若是設置失敗,在再次嘗試 if (p.casNext(null, newNode)) { // 若是tail節點有大於等於1個的 next 節點,則更新 tail 節點,將新添加的節點設置爲 tail 節點 if (p != t) // 至關於循環兩次更新一次 tail 節點 casTail(t, newNode); // 新添加的節點設置爲tail節點,容許失敗,失敗了表示有其餘線程成功更新了tail節點 return true; } } else if (p == q) // 只有在尾節點和尾節點的下一個節點爲空的狀況下成立 p = (t != (t = tail)) ? t : head; else // 把 tail節點設置爲爲尾節點,再次循環設置下一個節點 p = (p != t && t != (t = tail)) ? t : q; } }
以上方法就是向隊列中添加元素的方法,該方法也是該類中比較難理解的方法;
從上面的代碼能夠看出,入隊主要作兩件事情:第一是將新添加的節點設置成當前隊列尾節點的下一個節點;第二是更新tail節點,若是tail節點的next節點不爲空,則將入隊節點設置成tail節點,若是tail節點的next節點爲空,則將入隊節點設置成tail的next節點,因此tail節點不老是尾節點。
接下來經過圖的方法來看一下元素入隊的一個過程:
初始時隊列以下:
1. 添加第一個節點:
for循環:
for (Node<E> t = tail, p = t;;) { Node<E> q = p.next;
接下來進行 q==null 的判斷:
if (q == null) { if (p.casNext(null, newNode)) { if (p != t) casTail(t, newNode); return true; } }
此時,q==null, 也就是說尾節點的下一個節點爲空,此時把新添加的節點設置爲尾節點的下一個節點,有由於,p節點和 t節點是相等的,因此不會更新 tail 節點;設置成功後,鏈表以下:
2.添加第二個節點:
if (q == null) { .... } else if (p == q) // p = q = null p = (t != (t = tail)) ? t : head; else p = (p != t && t != (t = tail)) ? t : q;
此時, q 不爲空,有p和q不想等,因此只會走 else 分支,p != t 爲false,t != tail 也爲false,全部 把 q 賦給 p ,此時隊列以下:
以後,在走到for循環,此時 q == null, 因此,會把新添加的節點設置爲 p 的下一個節點:
if (q == null) { if (p.casNext(null, newNode)) { if (p != t) casTail(t, newNode); return true; } }
又由於此時,t 節點仍是指向第一個節點,p 指向第一個節點的next節點,t 和 p 不想等,因此執行 casTail(t, newNode); 新添加的節點設置爲tail 節點,設置成功後,鏈表以下所示:
3.添加第三個元素,添加成功後,鏈表以下所示:
4.添加第四個節點,添加成功後,鏈表以下所示:
、
至關於在添加元素時,每循環兩次纔會更新一次 tail 節點。
爲何不讓 tail 節點永遠爲隊列的尾節點,若是讓 tail 節點永遠爲隊列的尾節點,則實現的代碼會更少且邏輯也會更清晰,這是由於,若是讓 tail 永遠爲隊列的尾節點,則每次都須要使用循環CAS更新tail節點,若是能減小更新 tail 節點的次數,入隊的性能豈不是更高?因此說並非每次入隊都須要更新尾節點,只有在tail節點和尾節點不想等的狀況下才更新,這樣能減小更新此時,提升效率。
下面來看看獲取元素的操做,ConcurrentLinkedQueue是一個FIFO的隊列,因此獲取元素的時候,老是獲取到隊列的第一個元素;獲取元素有兩個方法,poll() 和 peek(),poll()方法獲取元素的時候,返回鏈表的第一個元素,並刪除,而 peek() 方法獲取元素的時候則不刪除,下面看一下獲取元素的主要代碼邏輯:
public E poll() { // 循環跳轉,goto語法 restartFromHead: for (;;) { // p 表示要出隊的節點,默認爲 head節點 for (Node<E> h = head, p = h, q;;) { // 出隊的元素 E item = p.item; // 若是出隊的元素不爲空,則把要出隊的元素設置null,不更新head節點;若是出隊元素爲null或者cas設置失敗,則表示有其餘線程已經進行修改,則須要重寫獲取 if (item != null && p.casItem(item, null)) { if (p != h) // 當head元素爲空,纔會更新head節點,這裏循環兩次,更新一次head節點 updateHead(h, ((q = p.next) != null) ? q : p); // 更新head節點 return item; } // 隊列爲空,返回null else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; // 把 p 的next節點賦值給p else p = q; } } }
下面經過圖的方式來查看獲取節點的過程:
首先隊列以下所示:
以後去獲取節點 item0,首先進行標記:
for (Node<E> h = head, p = h, q;;) { E item = p.item; .............................. }
標記以下:
由於此時 p.item 不爲空,所示 CAS 設置 p.item 爲null,走第一個if語句,又由於此時 h 節點的 item 不爲空,即 p 和 h 相等,因此不會更新頭節點,直接返回 p 節點中的元素 item0,以下圖所示:
if (item != null && p.casItem(item, null)) { // 不會執行 if (p != h) // updateHead(h, ((q = p.next) != null) ? q : p); return item; }
接下來再獲取 item1,進行標記,以下所示:
由於 p.item 爲空,因此不知足條件 if (item != null && p.casItem(item, null)), 以後執行 else if ((q = p.next) == null),此時,q指向了 item1,
又由於 q 不爲空,全部不會更新head節點,以後會執行最後一個else語句:p = q,第一次循環結束,開始第二次循環,又進行標記,此時標記以下:
由於 p.item 不爲空,因此走以下代碼邏輯,經過 CAS 把 p.item 設置爲null,
if (item != null && p.casItem(item, null)) { if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; }
由於此時,head節點元素爲空,即 p 和 h 節點不相等,因此會更新頭節點,又由於 p.next 即 item2 不爲空,因此把 p.next 即 item2 設置爲 head節點,設置成功後,隊列以下所示:
以後獲取item2也是一樣的邏輯,獲取 items2 後隊列以下:
獲取 item3 後以下所示:
以上就是從隊列中獲取元素的主要代碼邏輯,從上可知,head節點不必定就是隊列的第一個含有元素的節點,也不是每次獲取元素後就更新head節點,只有當head中的元素爲空的時候才更新head節點,這和添加 offer() 方法中更新tail節點相似,減小 CAS 更新head節點的次數,出隊的效率會更高。
ConcurrentLinkedQueue 經過 isEmpty來判斷隊列是否爲空,代碼以下:
public boolean isEmpty() { return first() == null; }
Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } }
isEmpty 方法會判斷鏈表的第一個元素是否爲空來進行判斷的。
public int size() { int count = 0; // succ() 獲取下一個元素 for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) if (++count == Integer.MAX_VALUE) break; return count; }
size()方法會遍歷全部的鏈表來查看有多少個元素。
對於在開發的時候,若是須要判斷是否爲空,則應該使用 isEmpty 而不該該使用 size() > 0 的方式,由於 size()會變量整個鏈表,效率較低。
ConcurrentLinkedQueue 類還有其餘的一些方法,只有理解了入隊和出隊的方法 offer() 和 poll() 方法,其餘方法就很好理解了。
/** * @ Author:lenovo * @ Date:Created in 下午 7:22 2018/7/17 0017 */ public class ConcurrentLinkedQueueTest { public static void main(String[] args) throws InterruptedException { new ConcurrentLinkedQueueTest().testConcurrentLinkedQueue(); Thread.sleep(5000L); } int num = 0; public ConcurrentLinkedQueue testConcurrentLinkedQueue(){ ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); for(int i = 0; i < 100; i++) { new Thread(() -> { num++; queue.offer(num); }).start(); } return queue; } }