一行一行源碼分析清楚 AbstractQueuedSynchronizer (二)html
轉自https://www.javadoop.com/post...java
文章比較長,信息量比較大,建議在 pc 上閱讀。文章標題是爲了呼應前文,其實能夠單獨成文的,主要是但願讀者看文章能系統看。node
本文關注如下幾點內容:linux
基本上本文把以上幾點都說清楚了,我假設讀者看過上一篇文章中對 AbstractQueuedSynchronizer 的介紹 ,固然若是你已經熟悉 AQS 中的獨佔鎖了,那也能夠直接看這篇。各小節之間基本上沒什麼關係,你們能夠只關注本身感興趣的部分。git
ReentrantLock 默認採用非公平鎖,除非你在構造方法中傳入參數 true 。面試
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
公平鎖的 lock 方法:數據庫
static final class FairSync extends Sync { final void lock() { acquire(1); } // AbstractQueuedSynchronizer.acquire(int arg) public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 1\. 和非公平鎖相比,這裏多了一個判斷:是否有線程在等待 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
非公平鎖的 lock 方法:編程
static final class NonfairSync extends Sync { final void lock() { // 2\. 和公平鎖相比,這裏會直接先進行一次CAS,成功就返回了 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } // AbstractQueuedSynchronizer.acquire(int arg) public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
總結:公平鎖和非公平鎖只有兩處不一樣:
公平鎖和非公平鎖就這兩點區別,若是這兩次 CAS 都不成功,那麼後面非公平鎖和公平鎖是同樣的,都要進入到阻塞隊列等待喚醒。
相對來講,非公平鎖會有更好的性能,由於它的吞吐量比較大。固然,非公平鎖讓獲取鎖的時間變得更加不肯定,可能會致使在阻塞隊列中的線程長期處於飢餓狀態。
Tips: 這裏重申一下,要看懂這個,必需要先看懂上一篇關於 AbstractQueuedSynchronizer 的介紹,或者你已經有相關的知識了,不然這節確定是看不懂的。
咱們先來看看 Condition 的使用場景,Condition 常常能夠用在生產者-消費者的場景中,請看 Doug Lea 給出的這個例子:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer { final Lock lock = new ReentrantLock(); // condition 依賴於 lock 來產生 final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; // 生產 public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); // 隊列已滿,等待,直到 not full 才能繼續生產 items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); // 生產成功,隊列已經 not empty 了,發個通知出去 } finally { lock.unlock(); } } // 消費 public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); // 隊列爲空,等待,直到隊列 not empty,才能繼續消費 Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); // 被我消費掉一個,隊列 not full 了,發個通知出去 return x; } finally { lock.unlock(); } } }
(ArrayBlockingQueue 採用這種方式實現了生產者-消費者,因此請只把這個例子當作學習例子,實際生產中能夠直接使用 ArrayBlockingQueue)
咱們經常使用 obj.wait(),obj.notify() 或 obj.notifyAll() 來實現類似的功能,可是,它們是基於對象的監視器鎖的。須要深刻了解這幾個方法的讀者,能夠參考個人另外一篇文章《深刻分析 java 8 編程語言規範:Threads and Locks》。而這裏說的 Condition 是基於 ReentrantLock 實現的,而 ReentrantLock 是依賴於 AbstractQueuedSynchronizer 實現的。
在往下看以前,讀者內心要有一個總體的概念。condition 是依賴於 ReentrantLock 的,無論是調用 await 進入等待仍是 signal 喚醒,都必須獲取到鎖才能進行操做。
每一個 ReentrantLock 實例能夠經過調用屢次 newCondition 產生多個 ConditionObject 的實例:
final ConditionObject newCondition() { return new ConditionObject(); }
咱們首先來看下咱們關注的 Condition 的實現類 AbstractQueuedSynchronizer
類中的 ConditionObject
。
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; // 條件隊列的第一個節點 // 不要管這裏的關鍵字 transient,是不參與序列化的意思 private transient Node firstWaiter; // 條件隊列的最後一個節點 private transient Node lastWaiter; ......
在上一篇介紹 AQS 的時候,咱們有一個阻塞隊列,用於保存等待獲取鎖的線程的隊列。這裏咱們引入另外一個概念,叫條件隊列(condition queue),我畫了一張簡單的圖用來講明這個。
這裏的阻塞隊列若是叫作同步隊列(sync queue)其實比較貼切,不過爲了和前篇呼應,我就繼續使用阻塞隊列了。記住這裏的兩個概念, 阻塞隊列和 條件隊列。
轉存失敗從新上傳取消
這裏,咱們簡單回顧下 Node 的屬性:
volatile int waitStatus; // 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3) volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter;prev 和 next 用於實現阻塞隊列的雙向鏈表,nextWaiter 用於實現條件隊列的單向鏈表
基本上,把這張圖看懂,你也就知道 condition 的處理流程了。因此,我先簡單解釋下這圖,而後再具體地解釋代碼實現。
我這裏說的 一、二、3 是最簡單的流程,沒有考慮中斷、signalAll、還有帶有超時參數的 await 方法等,不過把這裏弄懂是這節的主要目的。
同時,從圖中也能夠很直觀地看出,哪些操做是線程安全的,哪些操做是線程不安全的。
這個圖看懂後,下面的代碼分析就簡單了。
接下來,咱們一步步按照流程來走代碼分析,咱們先來看看 wait 方法:
// 首先,這個方法是可被中斷的,不可被中斷的是另外一個方法 awaitUninterruptibly() // 這個方法會阻塞,直到調用 signal 方法(指 signal() 和 signalAll(),下同),或被中斷 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加到 condition 的條件隊列中 Node node = addConditionWaiter(); // 釋放鎖,返回值是釋放鎖以前的 state 值 int savedState = fullyRelease(node); int interruptMode = 0; // 這裏退出循環有兩種狀況,以後再仔細分析 // 1\. isOnSyncQueue(node) 返回 true,即當前 node 已經轉移到阻塞隊列了 // 2\. checkInterruptWhileWaiting(node) != 0 會到 break,而後退出循環,表明的是線程中斷 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被喚醒後,將進入阻塞隊列,等待獲取鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
其實,我大致上也把整個 await 過程說得十之八九了,下面咱們分步把上面的幾個點用源碼說清楚。
addConditionWaiter() 是將當前節點加入到條件隊列,看圖咱們知道,這種條件隊列內的操做是線程安全的。
// 將當前線程對應的節點入隊,插入隊尾 private Node addConditionWaiter() { Node t = lastWaiter; // 若是條件隊列的最後一個節點取消了,將其清除出去 if (t != null && t.waitStatus != Node.CONDITION) { // 這個方法會遍歷整個條件隊列,而後會將已取消的全部節點清除出隊列 unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); // 若是隊列爲空 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
在addWaiter 方法中,有一個 unlinkCancelledWaiters() 方法,該方法用於清除隊列中已經取消等待的節點。
當 await 的時候若是發生了取消操做(這點以後會說),或者是在節點入隊的時候,發現最後一個節點是被取消的,會調用一次這個方法。
// 等待隊列是一個單向鏈表,遍歷鏈表將已經取消等待的節點清除出去 // 純屬鏈表操做,很好理解,看不懂多看幾遍就能夠了 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; // 若是節點的狀態不是 Node.CONDITION 的話,這個節點就是被取消的 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
回到 wait 方法,節點入隊了之後,會調用 int savedState = fullyRelease(node);
方法釋放鎖,注意,這裏是徹底釋放獨佔鎖,由於 ReentrantLock 是能夠重入的。
// 首先,咱們要先觀察到返回值 savedState 表明 release 以前的 state 值 // 對於最簡單的操做:先 lock.lock(),而後 condition1.await()。 // 那麼 state 通過這個方法由 1 變爲 0,鎖釋放,此方法返回 1 // 相應的,若是 lock 重入了 n 次,savedState == n // 若是這個方法失敗,會將節點設置爲"取消"狀態,並拋出異常 IllegalMonitorStateException final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 這裏使用了當前的 state 做爲 release 的參數,也就是徹底釋放掉鎖,將 state 置爲 0 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
釋放掉鎖之後,接下來是這段,這邊會自旋,若是發現本身還沒到阻塞隊列,那麼掛起,等待被轉移到阻塞隊列。
int interruptMode = 0; while (!isOnSyncQueue(node)) { // 線程掛起 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
isOnSyncQueue(Node node) 用於判斷節點是否已經轉移到阻塞隊列了:
// 在節點入條件隊列的時候,初始化時設置了 waitStatus = Node.CONDITION // 前面我提到,signal 的時候須要將節點從條件隊列移到阻塞隊列, // 這個方法就是判斷 node 是否已經移動到阻塞隊列了 final boolean isOnSyncQueue(Node node) { // 移動過去的時候,node 的 waitStatus 會置爲 0,這個以後在說 signal 方法的時候會說到 // 若是 waitStatus 仍是 Node.CONDITION,也就是 -2,那確定就是還在條件隊列中 // 若是 node 的前驅 prev 指向仍是 null,說明確定沒有在 阻塞隊列 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 若是 node 已經有後繼節點 next 的時候,那確定是在阻塞隊列了 if (node.next != null) return true; // 這個方法從阻塞隊列的隊尾開始從後往前遍歷找,若是找到相等的,說明在阻塞隊列,不然就是不在阻塞隊列 // 能夠經過判斷 node.prev() != null 來推斷出 node 在阻塞隊列嗎?答案是:不能。 // 這個能夠看上篇 AQS 的入隊方法,首先設置的是 node.prev 指向 tail, // 而後是 CAS 操做將本身設置爲新的 tail,但是此次的 CAS 是可能失敗的。 // 調用這個方法的時候,每每咱們須要的就在隊尾的部分,因此通常都不須要徹底遍歷整個隊列的 return findNodeFromTail(node); } // 從同步隊列的隊尾往前遍歷,若是找到,返回 true private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
回到前面的循環,isOnSyncQueue(node) 返回 false 的話,那麼進到 LockSupport.park(this);
這裏線程掛起。
爲了你們理解,這裏咱們先看喚醒操做,由於剛剛到 LockSupport.park(this); 把線程掛起了,等待喚醒。
喚醒操做一般由另外一個線程來操做,就像生產者-消費者模式中,若是線程由於等待消費而掛起,那麼當生產者生產了一個東西后,會調用 signal 喚醒正在等待的線程來消費。
// 喚醒等待了最久的線程 // 其實就是,將這個線程對應的 node 從條件隊列轉移到阻塞隊列 public final void signal() { // 調用 signal 方法的線程必須持有當前的獨佔鎖 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } // 從條件隊列隊頭日後遍歷,找出第一個須要轉移的 node // 由於前面咱們說過,有些線程會取消排隊,可是還在隊列中 private void doSignal(Node first) { do { // 將 firstWaiter 指向 first 節點後面的第一個 // 若是將隊頭移除後,後面沒有節點在等待了,那麼須要將 lastWaiter 置爲 null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 由於 first 立刻要被移到阻塞隊列了,和條件隊列的連接關係在這裏斷掉 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); // 這裏 while 循環,若是 first 轉移不成功,那麼選擇 first 後面的第一個節點進行轉移,依此類推 } // 將節點從條件隊列轉移到阻塞隊列 // true 表明成功轉移 // false 表明在 signal 以前,節點已經取消了 final boolean transferForSignal(Node node) { // CAS 若是失敗,說明此 node 的 waitStatus 已不是 Node.CONDITION,說明節點已經取消, // 既然已經取消,也就不須要轉移了,方法返回,轉移後面一個節點 // 不然,將 waitStatus 置爲 0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // enq(node): 自旋進入阻塞隊列的隊尾 // 注意,這裏的返回值 p 是 node 在阻塞隊列的前驅節點 Node p = enq(node); int ws = p.waitStatus; // ws > 0 說明 node 在阻塞隊列中的前驅節點取消了等待鎖,直接喚醒 node 對應的線程。喚醒以後會怎麼樣,後面再解釋 // 若是 ws <= 0, 那麼 compareAndSetWaitStatus 將會被調用,上篇介紹的時候說過,節點入隊後,須要把前驅節點的狀態設爲 Node.SIGNAL(-1) if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 若是前驅節點取消或者 CAS 失敗,會進到這裏喚醒線程,以後的操做看下一節 LockSupport.unpark(node.thread); return true; }
正常狀況下,ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
這句中,ws <= 0,並且 compareAndSetWaitStatus(p, ws, Node.SIGNAL) 會返回 true,因此通常也不會進去 if 語句塊中喚醒 node 對應的線程。而後這個方法返回 true,也就意味着 signal 方法結束了,節點進入了阻塞隊列。
假設發生了阻塞隊列中的前驅節點取消等待,或者 CAS 失敗,只要喚醒線程,讓其進到下一步便可。
上一步 signal 以後,咱們的線程由條件隊列轉移到了阻塞隊列,以後就準備獲取鎖了。只要從新獲取到鎖了之後,繼續往下執行。
等線程從掛起中恢復過來,繼續往下看
int interruptMode = 0; while (!isOnSyncQueue(node)) { // 線程掛起 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
先解釋下 interruptMode。interruptMode 能夠取值爲 REINTERRUPT(1),THROW_IE(-1),0
有如下三種狀況會讓 LockSupport.park(this); 這句返回繼續往下執行:
線程喚醒後第一步是調用 checkInterruptWhileWaiting(node) 這個方法,此方法用於判斷是否在線程掛起期間發生了中斷,若是發生了中斷,是 signal 調用以前中斷的,仍是 signal 以後發生的中斷。
// 1\. 若是在 signal 以前已經中斷,返回 THROW_IE // 2\. 若是是 signal 以後中斷,返回 REINTERRUPT // 3\. 沒有發生中斷,返回 0 private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
Thread.interrupted():若是當前線程已經處於中斷狀態,那麼該方法返回 true,同時將中斷狀態重置爲 false,因此,纔有後續的
從新中斷(REINTERRUPT)
的使用。
看看怎麼判斷是 signal 以前仍是以後發生的中斷:
// 只有線程處於中斷狀態,纔會調用此方法 // 若是須要的話,將這個已經取消等待的節點轉移到阻塞隊列 // 返回 true:若是此線程在 signal 以前被取消, final boolean transferAfterCancelledWait(Node node) { // 用 CAS 將節點狀態設置爲 0 // 若是這步 CAS 成功,說明是 signal 方法以前發生的中斷,由於若是 signal 先發生的話,signal 中會將 waitStatus 設置爲 0 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 將節點放入阻塞隊列 // 這裏咱們看到,即便中斷了,依然會轉移到阻塞隊列 enq(node); return true; } // 到這裏是由於 CAS 失敗,確定是由於 signal 方法已經將 waitStatus 設置爲了 0 // signal 方法會將節點轉移到阻塞隊列,可是可能還沒完成,這邊自旋等待其完成 // 固然,這種事情仍是比較少的吧:signal 調用以後,沒完成轉移以前,發生了中斷 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
這裏再說一遍,即便發生了中斷,節點依然會轉移到阻塞隊列。
到這裏,你們應該都知道這個 while 循環怎麼退出了吧。要麼中斷,要麼轉移成功。
while 循環出來之後,下面是這段代碼:
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
因爲 while 出來後,咱們肯定節點已經進入了阻塞隊列,準備獲取鎖。
這裏的 acquireQueued(node, savedState) 的第一個參數 node 以前已經通過 enq(node) 進入了隊列,參數 savedState 是以前釋放鎖前的 state,這個方法返回的時候,表明當前線程獲取了鎖,並且 state == savedState了。
注意,前面咱們說過,無論有沒有發生中斷,都會進入到阻塞隊列,而 acquireQueued(node, savedState) 的返回值就是表明線程是否被中斷。若是返回 true,說明被中斷了,並且 interruptMode != THROW_IE,說明在 signal 以前就發生中斷了,這裏將 interruptMode 設置爲 REINTERRUPT,用於待會從新中斷。
繼續往下:
if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
本着一絲不苟的精神,這邊說說 node.nextWaiter != null
怎麼知足。我前面也說了 signal 的時候會將節點轉移到阻塞隊列,有一步是 node.nextWaiter = null,將斷開節點和條件隊列的聯繫。
但是,在判斷髮生中斷的狀況下,是 signal 以前仍是以後發生的?
這部分的時候,我也介紹了,若是 signal 以前就中斷了,也須要將節點進行轉移到阻塞隊列,這部分轉移的時候,是沒有設置 node.nextWaiter = null 的。
以前咱們說過,若是有節點取消,也會調用 unlinkCancelledWaiters 這個方法,就是這裏了。
到這裏,咱們終於能夠好好說下這個 interruptMode 幹嗎用了。
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
爲何這麼處理?這部分的知識在本文的最後一節
通過前面的 7 步,整個 ConditionObject 類基本上都分析完了,接下來簡單分析下帶超時機制的 await 方法。
public final long awaitNanos(long nanosTimeout) throws InterruptedException public final boolean awaitUntil(Date deadline) throws InterruptedException public final boolean await(long time, TimeUnit unit) throws InterruptedException
這三個方法都差很少,咱們就挑一個出來看看吧:
public final boolean await(long time, TimeUnit unit) throws InterruptedException { // 等待這麼多納秒 long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); // 當前時間 + 等待時長 = 過時時間 final long deadline = System.nanoTime() + nanosTimeout; // 用於返回 await 是否超時 boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { // 時間到啦 if (nanosTimeout <= 0L) { // 這裏由於要 break 取消等待了。取消等待的話必定要調用 transferAfterCancelledWait(node) 這個方法 // 若是這個方法返回 true,在這個方法內,將節點轉移到阻塞隊列成功 // 返回 false 的話,說明 signal 已經發生,signal 方法將節點轉移了。也就是說沒有超時嘛 timedout = transferAfterCancelledWait(node); break; } // spinForTimeoutThreshold 的值是 1000 納秒,也就是 1 毫秒 // 也就是說,若是不到 1 毫秒了,那就不要選擇 parkNanos 了,自旋的性能反而更好 if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; // 獲得剩餘時間 nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
超時的思路仍是很簡單的,不帶超時參數的 await 是 park,而後等待別人喚醒。而如今就是調用 parkNanos 方法來休眠指定的時間,醒來後判斷是否 signal 調用了,調用了就是沒有超時,不然就是超時了。超時的話,本身來進行轉移到阻塞隊列,而後搶鎖。
關於 Condition 最後一小節了。
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
很簡單,我就不廢話了。
這篇文章說的是 AbstractQueuedSynchronizer,只不過好像 Condition 說太多了,趕忙把思路拉回來。
接下來,我想說說怎麼取消對鎖的競爭?
上篇文章提到過,最重要的方法是這個,咱們要在這裏面找答案:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
首先,到這個方法的時候,節點必定是入隊成功的。
我把 parkAndCheckInterrupt() 代碼貼過來:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
這兩段代碼聯繫起來看,是否是就清楚了。
若是咱們要取消一個線程的排隊,咱們須要在另一個線程中對其進行中斷。好比某線程調用 lock() 老久不返回,我想中斷它。一旦對其進行中斷,此線程會從 LockSupport.park(this);
中喚醒,而後 Thread.interrupted();
返回 true。
咱們發現一個問題,即便是中斷喚醒了這個線程,也就只是設置了 interrupted = true
而後繼續下一次循環。並且,因爲 Thread.interrupted();
會清除中斷狀態,第二次進 parkAndCheckInterrupt 的時候,返回會是 false。
因此,咱們要看到,在這個方法中,interrupted 只是用來記錄是否發生了中斷,而後用於方法返回值,其餘沒有作任何相關事情。
因此,咱們看外層方法怎麼處理 acquireQueued 返回 false 的狀況。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } static void selfInterrupt() { Thread.currentThread().interrupt(); }
因此說,lock() 方法處理中斷的方法就是,你中斷歸中斷,我搶鎖仍是照樣搶鎖,幾乎不要緊,只是我搶到鎖了之後,設置線程的中斷狀態而已,也不拋出任何異常出來。調用者獲取鎖後,能夠去檢查是否發生過中斷,也能夠不理會。
來條分割線。有沒有被騙的感受,我說了一大堆,但是和取消沒有任何關係啊。
咱們來看 ReentrantLock 的另外一個 lock 方法:
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
方法上多了個 throws InterruptedException
,通過前面那麼多知識的鋪墊,這裏我就再也不囉裏囉嗦了。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
繼續往裏:
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 就是這裏了,一旦異常,立刻結束這個方法,拋出異常。 // 這裏再也不只是標記這個方法的返回值表明中斷狀態 // 而是直接拋出異常,並且外層也不捕獲,一直往外拋到 lockInterruptibly throw new InterruptedException(); } } finally { // 若是經過 InterruptedException 異常出去,那麼 failed 就是 true 了 if (failed) cancelAcquire(node); } }
既然到這裏了,順便說說 cancelAcquire 這個方法吧:
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
到這裏,我想我應該把取消排隊這件事說清楚了吧。
在以前的文章中,咱們接觸了大量的中斷,這邊算是個總結吧。若是你徹底熟悉中斷了,沒有必要再看這節,本節爲新手而寫。
首先,咱們要明白,中斷不是相似 linux 裏面的命令 kill -9 pid,不是說咱們中斷某個線程,這個線程就中止運行了。中斷表明線程狀態,每一個線程都關聯了一箇中斷狀態,是一個 true 或 false 的 boolean 值,初始值爲 false。
關於中斷狀態,咱們須要重點關注如下幾個方法:
// Thread 類中的實例方法,持有線程實例引用便可檢測線程中斷狀態 public boolean isInterrupted() {} // Thread 中的靜態方法,檢測調用這個方法的線程是否已經中斷 // 注意:這個方法返回中斷狀態的同時,會將此線程的中斷狀態重置爲 false // 因此,若是咱們連續調用兩次這個方法的話,第二次的返回值確定就是 false 了 public static boolean interrupted() {} // Thread 類中的實例方法,用於設置一個線程的中斷狀態爲 true public void interrupt() {}
咱們說中斷一個線程,其實就是設置了線程的 interrupted status 爲 true,至於說被中斷的線程怎麼處理這個狀態,那是那個線程本身的事。如如下代碼:
while (!Thread.interrupted()) { doWork(); System.out.println("我作完一件事了,準備作下一件,若是沒有其餘線程中斷個人話"); }
固然,中斷除了是線程狀態外,還有其餘含義,不然也不須要專門搞一個這個概念出來了。
若是線程處於如下三種狀況,那麼當線程被中斷的時候,能自動感知到:
來自 Object 類的 wait()、wait(long)、wait(long, int),
來自 Thread 類的 join()、join(long)、join(long, int)、sleep(long)、sleep(long, int)
這幾個方法的相同之處是,方法上都有: throws InterruptedException若是線程阻塞在這些方法上(咱們知道,這些方法會讓當前線程阻塞),這個時候若是其餘線程對這個線程進行了中斷,那麼這個線程會從這些方法中當即返回,拋出 InterruptedException 異常,同時重置中斷狀態爲 false。
實現了 InterruptibleChannel 接口的類中的一些 I/O 阻塞操做,如 DatagramChannel 中的 connect 方法和 receive 方法等
若是線程阻塞在這裏,中斷線程會致使這些方法拋出 ClosedByInterruptException 並重置中斷狀態。
Selector 中的 select 方法,這個有機會咱們在講 NIO 的時候說
一旦中斷,方法當即返回
對於以上 3 種狀況是最特殊的,由於他們能自動感知到中斷(這裏說自動,固然也是基於底層實現),而且在作出相應的操做後都會重置中斷狀態爲 false。
那是否是隻有以上 3 種方法能自動感知到中斷呢?不是的,若是線程阻塞在 LockSupport.park(Object obj) 方法,也叫掛起,這個時候的中斷也會致使線程喚醒,可是喚醒後不會重置中斷狀態,因此喚醒後去檢測中斷狀態將是 true。
它是一個特殊的異常,不是說 JVM 對其有特殊的處理,而是它的使用場景比較特殊。一般,咱們能夠看到,像 Object 中的 wait() 方法,ReentrantLock 中的 lockInterruptibly() 方法,Thread 中的 sleep() 方法等等,這些方法都帶有 throws InterruptedException
,咱們一般稱這些方法爲阻塞方法(blocking method)。
阻塞方法一個很明顯的特徵是,它們須要花費比較長的時間(不是絕對的,只是說明時間不可控),還有它們的方法結束返回每每依賴於外部條件,如 wait 方法依賴於其餘線程的 notify,lock 方法依賴於其餘線程的 unlock等等。
當咱們看到方法上帶有 throws InterruptedException
時,咱們就要知道,這個方法應該是阻塞方法,咱們若是但願它能早點返回的話,咱們每每能夠經過中斷來實現。
除了幾個特殊類(如 Object,Thread等)外,感知中斷並提早返回是經過輪詢中斷狀態來實現的。咱們本身須要寫可中斷的方法的時候,就是經過在合適的時機(一般在循環的開始處)去判斷線程的中斷狀態,而後作相應的操做(一般是方法直接返回或者拋出異常)。固然,咱們也要看到,若是咱們一次循環花的時間比較長的話,那麼就須要比較長的時間才能注意到線程中斷了。
一旦中斷髮生,咱們接收到了這個信息,而後怎麼去處理中斷呢?本小節將簡單分析這個問題。
咱們常常會這麼寫代碼:
try { Thread.sleep(10000); } catch (InterruptedException e) { // ignore } // go on
當 sleep 結束繼續往下執行的時候,咱們每每都不知道這塊代碼是真的 sleep 了 10 秒,仍是隻休眠了 1 秒就被中斷了。這個代碼的問題在於,咱們將這個異常信息吞掉了。(對於 sleep 方法,我相信大部分狀況下,咱們都不在乎是不是中斷了,這裏是舉例)
AQS 的作法很值得咱們借鑑,咱們知道 ReentrantLock 有兩種 lock 方法:
public void lock() { sync.lock(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
前面咱們提到過,lock() 方法不響應中斷。若是 thread1 調用了 lock() 方法,過了好久還沒搶到鎖,這個時候 thread2 對其進行了中斷,thread1 是不響應這個請求的,它會繼續搶鎖,固然它不會把「被中斷」這個信息扔掉。咱們能夠看如下代碼:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 咱們看到,這裏也沒作任何特殊處理,就是記錄下來中斷狀態。 // 這樣,若是外層方法須要去檢測的時候,至少咱們沒有把這個信息丟了 selfInterrupt();// Thread.currentThread().interrupt(); }
而對於 lockInterruptibly() 方法,由於其方法上面有 throws InterruptedException
,這個信號告訴咱們,若是咱們要取消線程搶鎖,直接中斷這個線程便可,它會當即返回,拋出 InterruptedException 異常。
在併發包中,有很是多的這種處理中斷的例子,提供兩個方法,分別爲響應中斷和不響應中斷,對於不響應中斷的方法,記錄中斷而不是丟失這個信息。如 Condition 中的兩個方法就是這樣的:
void await() throws InterruptedException; void awaitUninterruptibly();
一般,若是方法會拋出 InterruptedException 異常,每每方法體的第一句就是:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); ...... }
熟練使用中斷,對於咱們寫出優雅的代碼是有幫助的,也有助於咱們分析別人的源碼。
參考: https://www.ibm.com/developerworks/library/j-jtp05236/index.html翻譯:https://www.ibm.com/developerworks/cn/java/j-jtp05236.html
更多內容請關注微信公衆號【Java技術江湖】
一位阿里 Java 工程師的技術小站。做者黃小斜,專一 Java 相關技術:SSM、SpringBoot、MySQL、分佈式、中間件、集羣、Linux、網絡、多線程,偶爾講點Docker、ELK,同時也分享技術乾貨和學習經驗,致力於Java全棧開發!(關注公衆號後回覆」Java「便可領取 Java基礎、進階、項目和架構師等免費學習資料,更有數據庫、分佈式、微服務等熱門技術學習視頻,內容豐富,兼顧原理和實踐,另外也將贈送做者原創的Java學習指南、Java程序員面試指南等乾貨資源)