接着上集繼續,SynchronousQueue是一個不存儲元素的阻塞隊列。每個put操做必須等待一個take操做,不然不能繼續添加元素,因此其peek()方法始終返回null,沒有數據緩存空間。SynchronousQueue支持公平與非公平訪問,默認採用非公平性策略訪問隊列。java
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue() : new TransferStack();
}
複製代碼
相對於ArrayBlockingQueue利用ReentrantLock實現公平與非公平,而SynchronousQueue利用TransferQueue、TransferStack實現公平與非公平,從命名上來看前者隊列,後者棧,SynchronousQueue的入隊、出隊操做都是基於transfer來實現,ctrl+alt+h查看方法調用node
TransferQueue內部定義以下編程
// 頭節點
transient volatile QNode head;
// 尾節點
transient volatile QNode tail;
// 指向一個可能還未出隊被取消的節點,由於它在被取消時是最後一個插入節點
transient volatile QNode cleanMe;
// 默認構造函數,建立一個假節點
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
static final class QNode {
// 後繼節點
volatile QNode next;
// item數據
volatile Object item;
// 用來控制阻塞或喚醒
volatile Thread waiter; // to control park/unpark
// 是不是生產者
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
...
}
...
複製代碼
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// 判斷是不是生產者,true爲生產者,false爲消費者
boolean isData = (e != null);
// 死循環
for (;;) {
// 獲取尾節點
QNode t = tail;
// 獲取頭節點
QNode h = head;
// 若尾節點或尾節點爲空則跳出本次循序
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 若TransferQueue爲空或當前節點與尾節點模式同樣
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// 若t不是尾節點代表已有其餘線程操做過,跳出本次循環從新來
if (t != tail) // inconsistent read
continue;
// 若以前獲取的尾節點後繼不爲空代表已有其餘線程添加過節點
if (tn != null) { // lagging tail
// CAS將tn置爲尾節點
advanceTail(t, tn);
continue;
}
// 若採用了時限模式且超時,直接返回null
if (timed && nanos <= 0) // can't wait
return null;
// 若s爲null,構建一個新節點
if (s == null)
s = new QNode(e, isData);
// CAS將新節點加入隊列中,若失敗從新來
if (!t.casNext(null, s)) // failed to link in
continue;
// CAS將新節點s置爲尾節點
advanceTail(t, s); // swing tail and wait
// 自旋獲取匹配item
Object x = awaitFulfill(s, e, timed, nanos);
// 若x==s代表線程獲取匹配項時,超時或者被中斷,清除節點s
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
// 判斷節點s是否已經出隊
if (!s.isOffList()) { // not already unlinked
// CAS將節點s置爲head,移出隊列
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
}
// else分支下述
}
}
複製代碼
咱們假定有線程A、B在put操做,線程C在take操做,當前TransferQueue初始化以下:數組
線程A添加元素A,head=tail走第一個分支,由於沒有采用鎖機制,因此可能會有其餘線程搶先操做,其採用各類判斷以及CAS來判斷是否有其餘線程操做過,添加完尾結點後,會調用awaitFulfill方法,其做用是自旋尋找匹配節點,若超過自旋次數此線程會阻塞,線程被中斷或採用時限模式時獲取超時這次操做會被取消。Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 獲取最後期限
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 獲取當前線程
Thread w = Thread.currentThread();
// 獲取自旋次數,若新節點s爲頭節點後繼節點才能自旋
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 判斷當前線程是否被中斷
if (w.isInterrupted())
// 取消當前節點,cas將item置爲this
s.tryCancel(e);
// 獲取節點s的item
Object x = s.item;
// 若線程中斷,節點s的item與x會不相等,直接返回x
if (x != e)
return x;
// 若採用了時限模式
if (timed) {
// 計算剩餘時間
nanos = deadline - System.nanoTime();
// 若超時,取消節點
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 若還有自旋次數,自旋-1
if (spins > 0)
--spins;
// 若等待線程爲null,將節點s的等待線程置爲當前線程
else if (s.waiter == null)
s.waiter = w;
// 若沒有采用時限模式則調用LockSupport.park()直接阻塞線程
else if (!timed)
LockSupport.park(this);
// 若剩餘時間超過自旋時間閾值則指定時間阻塞
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
複製代碼
從源碼中能夠看到只有頭節點後繼才能自旋,線程A自旋一段時間匹配節點,若自旋次數用光會一直阻塞,因此每個線程只有匹配到節點後或者因超時、中斷被取消才能繼續添加元素
緩存
線程A自旋,線程B接着put安全
那麼何時才匹配到呢?在開頭咱們提到每個put操做必須等待一個take操做,這時其餘線程take(),e爲null,isData爲false,與尾節點的isData屬性不一樣,走進else分支,先獲取頭節點的後繼節點數據,若沒有其餘線程搶先操做,且put操做未被取消,m.casItem(x, e)數據替換,將節點m的item屬性置爲null,若CAS替換成功代表匹配成功,在put自旋時會用item與e比較,take()將item置爲null,不相等返回null
併發
else { // complementary-mode
// 獲取頭節點後繼
QNode m = h.next; // node to fulfill
// 若t不是尾節點或者m爲null或者h不是頭節點,即已有其餘線程搶先操做過
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // 節點已被操做過
x == m || // 節點被取消
!m.casItem(x, e)) { // lost CAS
// CAS將m置爲頭節點,重來
advanceHead(h, m); // dequeue and retry
continue;
}
// 若走這裏,代表匹配成功
advanceHead(h, m); // successfully fulfilled
// 喚醒m的等待線程
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
複製代碼
TransferStack內部定義以下app
// 未執行的消費者
static final int REQUEST = 0;
// 未執行的生產者
static final int DATA = 1;
// 線程正在匹配節點
static final int FULFILLING = 2;
volatile SNode head;
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
...
}
...
複製代碼
TransferStack相對於TransferQueue中的節點,其數據項item與模式mode不須要用volatile修飾,由於它們老是寫在前讀在後。less
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
// REQUEST:消費者;DATA:生產者
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
// 若棧爲空或者新增元素模式與首元素模式相同
if (h == null || h.mode == mode) { // empty or same-mode
// 超時
if (timed && nanos <= 0) { // can't wait
// 若節點被取消,將取消節點出隊,從新來
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
//若不採用限時或者未超時,建立節點CAS將其置爲頭節點,s→h
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 自旋匹配
SNode m = awaitFulfill(s, timed, nanos);
// 若m==s代表節點被取消
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 其他分支下述
}
}
複製代碼
依然模擬場景,假定如今線程A、B在put,線程C、D在take。
線程A進行put新增元素A,CAS頭插元素A,調用awaitFulfill()自旋匹配,注意只有頭節點、空棧或者協助節點才能自旋,每次自旋都會進行條件判斷,爲了
dom
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 最後期限
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自旋次數
// 若棧爲空、節點爲首結點或者該節點模式爲FULFILLING才能自旋
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 若線程中斷,取消該節點
if (w.isInterrupted())
s.tryCancel();
// 匹配節點
SNode m = s.match;
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
// 超時,取消節點
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 每次自旋需先判斷是否知足自旋條件,知足次數-1
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
// 若沒有采用時限模式則調用LockSupport.park()直接阻塞線程
else if (!timed)
LockSupport.park(this);
// 若剩餘時間超過自旋時間閾值則指定時間阻塞
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
複製代碼
線程B接着put元素B,頭節點A的模式與put操做的模式一致,CAS頭插成功後,也調用awaitFulfill()自旋,因爲頭節點變爲線程B因此只有線程B才能自旋匹配,這也是不公平的體現
節點的取消與公平模式的差很少都是將屬性置爲其自己void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
複製代碼
這時線程C進行take操做,take的模式(REQUEST)明顯與當前頭節點B(DATA)不一致且頭節點模式也不爲FULFILLING,因此transfer走入else if分支。
// 若頭節點的模式不爲 FULFILLING
} else if (!isFulfilling(h.mode)) { // try to fulfill
// 若頭節點被取消,將頭節點出隊從新來
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
// 將節點s出隊
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
// 獲取節點m的後繼節點
SNode mn = m.next;
// 嘗試匹配
if (m.tryMatch(s)) {
// 匹配成功,將節點s、m出隊
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
// 若匹配失敗,將m出隊
s.casNext(m, mn); // help unlink
}
}
複製代碼
建立一個FULFILLING模式的節點並CAS將其置爲頭節點,與其後繼匹配,匹配方法以下
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
複製代碼
若節點沒有被取消,其match爲null,被取消則爲其自身。成功匹配後將一對put、take操做的節點出隊。咱們假定另外一種場景,若線程C的take節點入隊後,未進行匹配線程D中途take
頭節點C模式爲FULFILLING,transfer走入最後一個分支,並不會先建立節點而是 幫助頭節點先行匹配完成入隊出隊操做後,再第二次循環繼續執行本身操做// 頭節點模式爲 FULFILLING
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
複製代碼
LinkedTransferQueue是由鏈表結構組成的無界阻塞FIFO隊列
// 判斷是否多核處理器
private static final boolean MP =
Runtime.getRuntime().availableProcessors() > 1;
// 自旋次數
private static final int FRONT_SPINS = 1 << 7;
// 前驅節點正在操做,當前節點自旋的次數
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
static final int SWEEP_THRESHOLD = 32;
// 頭節點
transient volatile Node head;
// 尾節點
private transient volatile Node tail;
// 刪除節點失敗的次數
private transient volatile int sweepVotes;
/**
* xfer()方法中使用
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
複製代碼
static final class Node {
final boolean isData; // false if this is a request node
volatile Object item; // initially non-null if isData; CASed to match
volatile Node next;
volatile Thread waiter;
Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}
...
}
複製代碼
是否是感受與SynchronousQueue中TransferQueue的QNode節點類定義很相似
LinkedTransferQueue的大多方法都是基於xfer()方法
/**
* @param e 入隊數據
* @param haveData true:入隊;flase:出隊
* @param how NOW, ASYNC, SYNC, or TIMED
* @param nanos 期限僅TIMED限時模式使用
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
// 如果入隊操做,但無數據拋異常
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
// 從頭節點遍歷
for (Node h = head, p = h; p != null;) { // find & match first node
// 獲取模式isData
boolean isData = p.isData;
// 獲取數據項
Object item = p.item;
// 找到未匹配的節點
if (item != p && (item != null) == isData) { // unmatched
// 若操做模式同樣,不匹配
if (isData == haveData) // can't match
break;
// 若匹配,CAS將替換item
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
// 更新 head
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 喚醒線程
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.cast(item);
}
}
// 後繼
Node n = p.next;
// 若p的後繼是其自身,代表p已經有其餘線程操做過,從頭節點重寫開始
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 若沒有找到匹配節點,
// NOW爲untimed poll, tryTransfer,不會入隊
if (how != NOW) { // No matches available
if (s == null)
// 建立節點
s = new Node(e, haveData);
// 尾插入隊
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
// 若不是異步操做
if (how != ASYNC)
// 阻塞等待匹配值
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
複製代碼
以put()方法爲例,假定隊列爲空此時有線程put(其內部xfer(e, true, ASYNC, 0)),由於不等於now,調用tryAppend()方法尾插入隊
private Node tryAppend(Node s, boolean haveData) {
// 從尾節點開始
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
// 若隊列爲空CAS將S置爲頭節點
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s; // initialize
}
else if (p.cannotPrecede(haveData))
return null; // lost race vs opposite mode
// 若不是最後節點
else if ((n = p.next) != null) // not last; keep traversing
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
// CAS設置將s置爲p的後繼
else if (!p.casNext(null, s))
// 若設置失敗從新來
p = p.next; // re-read on CAS failure
else {
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
複製代碼
從源碼中能夠得知,當第一次tryAppend()隊列爲空時只設置了頭節點,第二次tryAppend()纔會設置尾結點,入隊後,若不是ASYNC還會調用awaitMatch()方法阻塞匹配
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// 若限時獲取最後期限
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// 不相等代表已經匹配過,有其餘線程已操做過
if (item != e) { // matched
// assert item != s;
// 取消節點
s.forgetContents(); // avoid garbage
return LinkedTransferQueue.cast(item);
}
// 若線程中斷或超時則取消節點
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
unsplice(pred, s);
return e;
}
// 初始化自旋次數
if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
// 自旋
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
// 若採用限時則限時阻塞
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
// 直接阻塞
else {
LockSupport.park(this);
}
}
}
複製代碼
其整個隊列只存在一個操做(入隊或出隊),若不一樣操做會替換item喚醒相應另個線程,若相同操做則根據形參how判斷判斷
NOW:直接返回操做節點不入隊
ASYNC:操做節點尾插入隊,但不會阻塞等待直接返回,同一個線程隨便可以接着操做
SYNC:操做節點尾插入隊且會自旋匹配一段時間,自旋次數用完進入阻塞狀態,像SynchronousQueue同樣同一個線程操做完必須匹配到或被取消後才能繼續操做
TIMED:限時模式,在指定時間內若沒匹配到操做會被取消
相對於SynchronousQueue,LinkedTransferQueue能夠存儲元素且可支持不阻塞形式的操做,而相對於LinkedBlockingQueue維護了入隊鎖和出隊鎖,LinkedTransferQueue經過CAS保證線程安全更提升了效率
LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列,雙向隊列就意味着能夠從對頭、對尾兩端插入和移除元素。LinkedBlockingDeque默認構造容量Integer.MAX_VALUE,也能夠指定容量
// 頭節點
transient Node first;
// 尾節點
transient Node last;
// 元素個數
private transient int count;
// 容量
private final int capacity;
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
複製代碼
static final class Node {
// 數據項
E item;
// 前驅節點
Node prev;
// 後繼節點
Node next;
Node(E x) {
item = x;
}
}
複製代碼
public void putFirst(E e) throws InterruptedException {
// 判空
if (e == null) throw new NullPointerException();
// 建立節點
Node node = new Node(e);
final ReentrantLock lock = this.lock;
// 獲取鎖
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
複製代碼
判空處理而後獲取鎖,調用linkFirst()入隊
private boolean linkFirst(Node node) {
// assert lock.isHeldByCurrentThread();
// 若當前元素個數超過指定容量,返回false
if (count >= capacity)
return false;
// 獲取首節點
Node f = first;
// 新節點後繼指向首節點
node.next = f;
// 新節點置爲首節點
first = node;
// 若隊列爲空則新節點置爲尾節點
if (last == null)
last = node;
// 若不爲空,新節點置爲首節點的前驅節點
else
f.prev = node;
// 元素個數+1
++count;
// 喚醒出隊(消費者)等待隊列中線程
notEmpty.signal();
return true;
}
複製代碼
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node node = new Node(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
複製代碼
判空處理而後獲取鎖,調用linkLast()入隊
private boolean linkLast(Node node) {
// assert lock.isHeldByCurrentThread();
// 若當前元素個數超過指定容量,返回false
if (count >= capacity)
return false;
// 獲取尾節點
Node l = last;
// 將新節點的前驅節點置爲原尾節點
node.prev = l;
// 新節點置爲尾節點
last = node;
// 若隊列爲空,首結點置爲頭節點
if (first == null)
first = node;
// 不然將新節點置爲原未節點的後繼節點
else
l.next = node;
// 元素個數+1
++count;
// 喚醒出隊(消費者)等待隊列中線程
notEmpty.signal();
return true;
}
複製代碼
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
複製代碼
unlinkFirst()方法
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
// 獲取頭節點
Node f = first;
// 若first爲null即隊列爲空,返回null
if (f == null)
return null;
// 獲取頭節點的後繼節點
Node n = f.next;
E item = f.item;
// 刪除頭節點
f.item = null;
f.next = f; // help GC
// 將原頭節點的後繼節點置爲頭節點
first = n;
// 若原隊列僅一個節點,則尾節點置空
if (n == null)
last = null;
// 不然原頭節點的後繼節點的前驅置爲null
else
n.prev = null;
// 元素個數-1
--count;
// 喚醒入隊(生產者)等待隊列中線程
notFull.signal();
return item;
}
複製代碼
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
複製代碼
unlinkLast
private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
// 獲取尾節點
Node l = last;
// 尾節點爲null即隊列爲空,返回null
if (l == null)
return null;
// 獲取原尾節點的前驅節點
Node p = l.prev;
E item = l.item;
// 刪除尾節點
l.item = null;
l.prev = l; // help GC
// 將原尾節點的前驅節點置爲尾節點
last = p;
// 若原隊列僅一個節點,則頭節點置空
if (p == null)
first = null;
// 不然原尾節點的前驅節點的後繼置爲null
else
p.next = null;
// 元素個數-1
--count;
notFull.signal();
return item;
}
複製代碼
邏輯就很少說了,看過LinkedList源碼的應該不會陌生,除了多了喚醒阻塞獲取鎖操做,基本邏輯相似
《java併發編程的藝術》