要實現對隊列的安全訪問,有兩種方式:阻塞算法和非阻塞算法。阻塞算法的實現是使用一把鎖(出隊和入隊同一把鎖ArrayBlockingQueue)和兩把鎖(出隊和入隊各一把鎖LinkedBlockingQueue)來實現;非阻塞算法使用自旋+CAS實現。html
今天來探究下使用非阻塞算法來實現的線程安全隊列ConcurrentLinkedQueue,它是一個基於連接節點的無界線程安全隊列,採用先進先出的規則對節點進行排序,當咱們添加一個元素的時候,它會添加到隊列的尾部,當咱們獲取一個元素時,它會返回隊列頭部的元素。它採用了「wait-free」算法(即CAS算法)來實現。java
ConcurrentLinkedQueue的類圖結構:node
從類圖中能夠看到,ConcurrentLinkedQueue由head和tail節點組成,每一個節點Node由節點元素item和指向下一個節點的引用next組成,節點與節點之間經過next關聯起來組成一張鏈表結構的隊列。算法
private static class Node<E> { volatile E item;//元素 volatile Node<E> next;//下一節點 Node(E item) {//添加元素 UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) {//cas修改元素 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) {//添加節點 UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) {//cas修改節點 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; //得到元素的偏移位置 itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); //得到下一節點的偏移位置 nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } //頭節點 private transient volatile Node<E> head; //尾節點 private transient volatile Node<E> tail; public ConcurrentLinkedQueue() { //默認狀況下head節點存儲的元素爲空,tail節點等於head節點。 head = tail = new Node<E>(null); } public ConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; //遍歷集合 for (E e : c) { checkNotNull(e);//檢查是否爲空,若是爲空拋出空指針異常 //建立節點和將元素存儲到節點中 Node<E> newNode = new Node<E>(e); if (h == null)//頭節點爲空 h = t = newNode;//頭和尾節點是建立的節點 else { t.lazySetNext(newNode);//添加節點 t = newNode;//修改尾節點的標識 } } //若是集合沒有元素,設置隊列的頭尾節點爲空 if (h == null) h = t = new Node<E>(null); head = h;//更新隊列的頭節點標識 tail = t;//更新隊列的尾節點標識 } private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); }
入隊操做主要作兩件事情,第一是將入隊節點設置成當前隊列尾節點的下一個節點;第二是更新tail節點,若是tail節點的next節點不爲空,則將入隊節點設置成tail節點,若是tail節點的next節點爲空,則將入隊節點設置成tail的next節點,因此tail節點不老是尾節點;安全
上面的分析讓咱們從單線程入隊的角度來理解入隊過程,可是多個線程同時進行入隊狀況就變得更加複雜,由於可能會出現其餘線程插隊的狀況。若是有一個線程正在入隊,那麼它必須先獲取尾節點,而後設置尾節點的下一個節點爲入隊節點,但這時可能有另一個線程插隊了,那麼隊列的尾節點就會發生變化,這時當前線程要暫停入隊操做,而後從新獲取尾節點。多線程
public boolean add(E e) { return offer(e); } public boolean offer(E e) { checkNotNull(e);//檢查是否爲空 //建立入隊節點,將元素添加到節點中 final Node<E> newNode = new Node<E>(e); //自旋隊列CAS直到入隊成功 // 一、根據tail節點定位出尾節點(last node);二、將新節點置爲尾節點的下一個節點;三、casTail更新尾節點 for (Node<E> t = tail, p = t;;) { //p是尾節點,q獲得尾節點的next Node<E> q = p.next; //若是q爲空 if (q == null) { //p是last node,將尾節點的next修改成建立的節點 if (p.casNext(null, newNode)) { //p在遍歷後會變化,所以須要判斷,若是不相等即p != t = tail,表示t(= tail)不是尾節點,則將入隊節點設置成tail節點,更新失敗了也不要緊,由於失敗了表示有其餘線程成功更新了tail節點 if (p != t) casTail(t, newNode);//入隊節點更新爲尾節點,容許失敗,所以t= tail並不老是尾節點 return true;//結束 } } //從新獲取head節點:多線程操做時,輪詢後p有可能等於q,此時,就須要對p從新賦值 //(多線程自引用的狀況,只有offer()和poll()交替執行時會出現) else if (p == q) //由於併發下可能tail被改了,若是被改了,則使用新的t,不然跳轉到head,從鏈表頭從新輪詢,由於從head開始全部的節點均可達 p = (t != (t = tail)) ? t : head;//運行到這裏再繼續自旋遍歷 else /** * 尋找尾節點,一樣,當t不等於p時,說明p在上面被從新賦值了,而且tail也被別的線程改了,則使用新的tail,不然循環檢查p的下個節點 * (多offer()狀況下會出現) * p=condition?result1:result2 * 知足result1的場景爲 : * 獲取尾節點tail的快照已通過時了(其餘線程更新了新的尾節點tail),直接跳轉到當前得到的最新尾節點的地方 * 知足result2的場景爲: * 多線程同時操做offer(),執行p.casNext(null, newNode)CAS成功後,未更新尾節點(未執行casTail(t, newNode)方法:兩種緣由 1是未知足前置條件if判斷 2是CAS更新失敗),直接找next節點 */ p = (p != t && t != (t = tail)) ? t : q;//運行到這裏再繼續自旋遍歷 } }
public static void main(String[] args) throws IndexOutOfBoundsException { ConcurrentLinkedQueue c = new ConcurrentLinkedQueue(); new Thread(()->{ int i; for(i=0;i<10;){ c.offer(i++); Object poll = c.poll();//註釋或取消進行測試 System.out.println(Thread.currentThread().getName()+":"+poll); } }).start(); new Thread(()->{ int i; for(i=200;i<210;){ c.offer(i++); Object poll = c.poll();//註釋或取消進行測試 System.out.println(Thread.currentThread().getName()+":"+poll); } }).start(); }
public E poll() { restartFromHead: //自旋 for (;;) { //得到頭節點 for (Node<E> h = head, p = h, q;;) { E item = p.item;//得到頭節點元素 //若是頭節點元素不爲null而且cas刪除頭節點元素成功 if (item != null && p.casItem(item, null)) { //p被修改了 if (p != h) // hop two nodes at a time // 若是p 的next 屬性不是null ,將 p 做爲頭節點,而 q 將會消失 updateHead(h, ((q = p.next) != null) ? q : p); return item; } //若是頭節點的元素爲空或頭節點發生了變化,這說明頭節點已經被另一個線程修改了。 // 那麼獲取p節點的下一個節點,若是p節點的下一節點爲null,則代表隊列已經空了 // 若是 p(head) 的 next 節點 q 也是null,則表示沒有數據了,返回null,則將 head 設置爲null // 注意:updateHead 方法最後還會將原有的 head 做爲本身 next 節點,方便offer 鏈接。 else if ((q = p.next) == null) { updateHead(h, p); return null; } //若是 p == q,說明別的線程取出了 head,並將 head 更新了。就須要從新開始獲取head節點 else if (p == q) continue restartFromHead; // 若是下一個元素不爲空,則將頭節點的下一個節點設置成頭節點 else p = q; } } } final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) // 將舊的頭結點h的next域指向爲h h.lazySetNext(h); }
若是這時,再有一個線程來添加元素,經過tail獲取的next節點則仍然是它自己,這就出現了p == q的狀況,出現該種狀況以後,則會觸發執行head的更新,將p節點從新指向爲head,全部「活着」的節點(指未刪除節點),都能從head經過遍歷可達,這樣就能經過head成功獲取到尾節點,而後添加元素了。併發
// 獲取鏈表的首部元素(只讀取而不移除) public E peek() { restartFromHead: //自旋 for (;;) { for (Node<E> h = head, p = h, q;;) { //得到頭節點元素 E item = p.item; //頭節點元素不爲空或頭節點下一節點爲空(表示鏈表只有一個節點) if (item != null || (q = p.next) == null) { updateHead(h, p);//更新頭節點標識 return item; } /若是 p == q,說明別的線程取出了 head,並將 head 更新了。就須要從新開始獲取head節點 else if (p == q) continue restartFromHead; // 若是下一個元素不爲空,則將頭節點的下一個節點設置成頭節點 else p = q; } } }
public boolean isEmpty() { return first() == null; } Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { //頭節點是否有元素 boolean hasItem = (p.item != null); //頭節點有元素或當前鏈表只有一個節點 if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null;//頭節點有值返回節點,不然返回null } else if (p == q) continue restartFromHead; else p = q; } } }
public int size() { int count = 0; // first()獲取第一個具備非空元素的節點,若不存在,返回null // succ(p)方法獲取p的後繼節點,若p == p的後繼節點,則返回head for (Node<E> p = first(); p != null; p = succ(p)) //節點有元素數量+1 if (p.item != null) if (++count == Integer.MAX_VALUE) break; return count; } //取下一節點 final Node<E> succ(Node<E> p) { Node<E> next = p.next; //若p == p的後繼節點(自引用狀況下會出現),則返回head return (p == next) ? head : next; }
public boolean contains(Object o) { if (o == null) return false; for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; // 若找到匹配節點,則返回true if (item != null && o.equals(item)) return true; } return false; }
public boolean remove(Object o) { //刪除的元素不能爲null, if (o != null) { Node<E> next, pred = null; //遍歷,開始得到頭節點, for (Node<E> p = first(); p != null; pred = p, p = next) { boolean removed = false;//刪除的標識 E item = p.item;//節點元素 if (item != null) { //節點的元素不等於要刪除的元素,獲取下一節點進行遍歷循環操做 if (!o.equals(item)) { next = succ(p);//將當前遍歷的節點移到下一節點 continue; } //節點元素等於刪除元素,CAS將節點元素置爲null removed = p.casItem(item, null); } next = succ(p);//獲取刪除節點的下一節點, //有前節點和後置節點 if (pred != null && next != null) // unlink pred.casNext(p, next);//刪除當前節點,即當前節點移除出隊列 if (removed)//元素刪除了返回true return true; } } return false; }