要實現一個線程安全的隊列有兩種方式:阻塞和非阻塞。阻塞隊列無非就是鎖的應用,而非阻塞則是CAS算法的應用。下面咱們就開始一個非阻塞算法的研究:CoucurrentLinkedQueue。html
ConcurrentLinkedQueue是一個基於連接節點的無邊界的線程安全隊列,它採用FIFO原則對元素進行排序。採用「wait-free」算法(即CAS算法)來實現的。java
CoucurrentLinkedQueue規定了以下幾個不變性:node
head的不變性和可變性:算法
不變性安全
可變性多線程
tail的不變性和可變性併發
不變性源碼分析
可變性this
CoucurrentLinkedQueue的結構由head節點和tail節點組成,每一個節點由節點元素item和指向下一個節點的next引用組成,而節點與節點之間的關係就是經過該next關聯起來的,從而組成一張鏈表的隊列。節點Node爲ConcurrentLinkedQueue的內部類,定義以下:spa
private static class Node<E> {
/** 節點元素域 */
volatile E item;
volatile Node<E> next;
//初始化,得到item 和 next 的偏移量,爲後期的CAS作準備
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
/** 偏移量 */
private static final long itemOffset;
/** 下一個元素的偏移量 */
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
複製代碼
入列,咱們認爲是一個很是簡單的過程:tail節點的next執行新節點,而後更新tail爲新節點便可。從單線程角度咱們這麼理解應該是沒有問題的,可是多線程呢?若是一個線程正在進行插入動做,那麼它必須先獲取尾節點,而後設置尾節點的下一個節點爲當前節點,可是若是已經有一個線程剛恰好完成了插入,那麼尾節點是否是發生了變化?對於這種狀況ConcurrentLinkedQueue怎麼處理呢?咱們先看源碼:
offer(E e):將指定元素插入都隊列尾部:
public boolean offer(E e) {
//檢查節點是否爲null
checkNotNull(e);
// 建立新節點
final Node<E> newNode = new Node<E>(e);
//死循環 直到成功爲止
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// q == null 表示 p已是最後一個節點了,嘗試加入到隊列尾
// 若是插入失敗,則表示其餘線程已經修改了p的指向
if (q == null) { // --- 1
// casNext:t節點的next指向當前節點
// casTail:設置tail 尾節點
if (p.casNext(null, newNode)) { // --- 2
// node 加入節點後會致使tail距離最後一個節點相差大於一個,須要更新tail
if (p != t) // --- 3
casTail(t, newNode); // --- 4
return true;
}
}
// p == q 等於自身
else if (p == q) // --- 5
// p == q 表明着該節點已經被刪除了
// 因爲多線程的緣由,咱們offer()的時候也會poll(),若是offer()的時候正好該節點已經poll()了
// 那麼在poll()方法中的updateHead()方法會將head指向當前的q,而把p.next指向本身,即:p.next == p
// 這樣就會致使tail節點滯後head(tail位於head的前面),則須要從新設置p
p = (t != (t = tail)) ? t : head; // --- 6
// tail並無指向尾節點
else
// tail已經不是最後一個節點,將p指向最後一個節點
p = (p != t && t != (t = tail)) ? t : q; // --- 7
}
}
複製代碼
光看源碼仍是有點兒迷糊的,插入節點一次分析就會明朗不少。
ConcurrentLinkedQueue初始化時head、tail存儲的元素都爲null,且head等於tail:
按照程序分析:第一次插入元素A,head = tail = dummyNode,全部q = p.next = null,直接走步驟2:p.casNext(null, newNode),因爲 p == t成立,因此不會執行步驟3:casTail(t, newNode),直接return。插入A節點後以下:
q = p.next = A ,p = tail = dummyNode,因此直接跳到步驟7:p = (p != t && t != (t = tail)) ? t : q;。此時p = q,而後進行第二次循環 q = p.next = null,步驟2:p == null成立,將該節點插入,由於p = q,t = tail,因此步驟3:p != t 成立,執行步驟4:casTail(t, newNode),而後return。以下:
此時t = tail ,p = t,q = p.next = null,和插入元素A無異,以下:
這裏整個offer()過程已經分析完成了,可能p == q 有點兒難理解,p 不是等於q.next麼,怎麼會有p == q呢?這個疑問咱們在出列poll()中分析
ConcurrentLinkedQueue提供了poll()方法進行出列操做。入列主要是涉及到tail,出列則涉及到head。咱們先看源碼:
public E poll() {
// 若是出現p被刪除的狀況須要從head從新開始
restartFromHead: // 這是什麼語法?真心沒有見過
for (;;) {
for (Node<E> h = head, p = h, q;;) {
// 節點 item
E item = p.item;
// item 不爲null,則將item 設置爲null
if (item != null && p.casItem(item, null)) { // --- 1
// p != head 則更新head
if (p != h) // --- 2
// p.next != null,則將head更新爲p.next ,不然更新爲p
updateHead(h, ((q = p.next) != null) ? q : p); // --- 3
return item;
}
// p.next == null 隊列爲空
else if ((q = p.next) == null) { // --- 4
updateHead(h, p);
return null;
}
// 當一個線程在poll的時候,另外一個線程已經把當前的p從隊列中刪除——將p.next = p,p已經被移除不能繼續,須要從新開始
else if (p == q) // --- 5
continue restartFromHead;
else
p = q; // --- 6
}
}
}
複製代碼
這個相對於offer()方法而言會簡單些,裏面有一個很重要的方法:updateHead(),該方法用於CAS更新head節點,以下:
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
複製代碼
咱們先將上面offer()的鏈表poll()掉,添加A、B、C節點結構以下:
head = dumy,p = head, item = p.item = null,步驟1不成立,步驟4:(q = p.next) == null不成立,p.next = A,跳到步驟6,下一個循環,此時p = A,因此步驟1 item != null,進行p.casItem(item, null)成功,此時p == A != h,因此執行步驟3:updateHead(h, ((q = p.next) != null) ? q : p),q = p.next = B != null,則將head CAS更新成B,以下:
head = B , p = head = B,item = p.item = B,步驟成立,步驟2:p != h 不成立,直接return,以下:
head = dumy ,p = head = dumy,tiem = p.item = null,步驟1不成立,跳到步驟4:(q = p.next) == null,不成立,而後跳到步驟6,此時,p = q = C,item = C(item),步驟1成立,因此講C(item)設置爲null,步驟2:p != h成立,執行步驟3:updateHead(h, ((q = p.next) != null) ? q : p),以下:
看到這裏是否是一目瞭然了,在這裏咱們再來分析offer()的步驟5:
else if(p == q){
p = (t != (t = tail))? t : head;
}
複製代碼
ConcurrentLinkedQueue中規定,p == q代表,該節點已經被刪除了,也就說tail滯後於head,head沒法經過succ()方法遍歷到tail,怎麼作? (t != (t = tail))? t : head;(這段代碼的可讀性實在是太差了,真他媽難理解:不知道是否能夠理解爲t != tail ? tail : head)這段代碼主要是來判讀tail節點是否已經發生了改變,若是發生了改變,則說明tail已經從新定位了,只須要從新找到tail便可,不然就只能指向head了。
就上面那個咱們再次插入一個元素D。則p = head,q = p.next = null,執行步驟1: q = null且 p != t ,因此執行步驟4:,以下:
再插入元素E,q = p.next = null,p == t,因此插入E後以下:
ConcurrentLinkedQueue 的非阻塞算法實現可歸納爲下面 5 點: