本文是 ReentrantLock 源碼的第二篇,第一篇主要介紹了公平鎖非公平鎖正常的加鎖解鎖流程,雖然表達能力有限不知道有沒有講清楚,本着不太監的原則,本文填補下第一篇中挖的坑。html
若是咱們但願檢測到中斷後能馬上拋出異常就用 lockInterruptibly 方法去加鎖,仍是建議用 lock 方法,自定義中斷處理,更靈活一點。node
咱們只須要把 ReentrantLock#lock 改爲 ReentrantLock#lockInterruptibly 方法就能夠得到內部檢測中斷的鎖了多線程
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
主要流程和前文介紹的相似併發
public final void acquireInterruptibly(int arg) throws InterruptedException { // 一上來就檢查下中斷,中斷直接異常,就不必搶鎖排隊了 if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
和正常加鎖惟一區別就是這個方法,可是定睛一看是否是似曾相識?最大區別就是把中斷標識給去掉了,檢測到中斷直接拋異常源碼分析
private void doAcquireInterruptibly(int arg) throws InterruptedException { // 大神也偷懶了,由於這個方法,只有獨佔鎖且檢查中斷這一個應用場景,把節點入隊的步驟也揉了進來 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 當線程拿到鎖甦醒過來,發現本身掛起過程被中斷了,直接拋出異常 throw new InterruptedException(); } } finally { // 只要發生了中斷異常,就會進取消加鎖方法 if (failed) cancelAcquire(node); } }
此方法頗有東西,只保證該節點失效,而後延遲移出等待隊列性能
private void cancelAcquire(Node node) { if (node == null) return; // 把節點裏登記等待的線程去掉,完成這一步此節點已經沒有做用了 node.thread = null; // 下面的三步其實能夠放到一個CAS中,直接設置 CANCELLED 狀態 ,拿前一個節點,predNext 也必然是本身,可是吞吐量就下來了 // 這裏大神,沒有這樣作也是出於了性能考慮,由於咱們已經把等待線程設置成 null 了,因此此節點已經沒有任何意義,沒有必要去保證節點第一時間被釋放,只要設置好 CANCELLED 狀態 // 就算後面 CAS 調整等待隊列失敗了,下次取消操做也會幫着回收。相應地代碼複雜度提升了。 /* ----------------------------------------- */ // 找到本身前面第一個沒取消的節點, Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 主要是爲了下面把鏈表接上 Node predNext = pred.next; // 這裏邏輯上把當前節點的狀態設置成取消,便於檢測釋放 node.waitStatus = Node.CANCELLED; /* ----------------------------------------- */ // 若是當前節點是尾節點,就把前一個沒取消的節點設成新尾巴 if (node == tail && compareAndSetTail(node, pred)) { // 把新尾巴的 next 設置成空 compareAndSetNext(pred, predNext, null); } else { // 進到這裏說明當前節點確定不是尾節點了 int ws; // 條件1: 若是前一個非取消節點不是頭,也就是還須要排隊 // 條件2: 若是前一個節點爲 SIGNAL,也就是說後面確定還有線程等待被喚醒 // 條件3: 若是前一個節點也取消了,說明前一個節點也取消了,還沒來得及設置狀態 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) // 當前節點後一個沒取消的話,就接到前一個正常的節點後面 compareAndSetNext(pred, predNext, next); } else { // 前一篇文章解鎖部分講過,會把下一個節點中的線程恢復,而後把後繼節點接上 unparkSuccessor(node); } // 有點花裏胡哨,直接 = null不行麼, node.next = node; // help GC } }
來張圖說明下,假如咱們目前等待隊列裏有7個線程:ui
上篇文章看源碼過程當中,AQS中有個 CONDITION 狀態沒有研究this
static final int CONDITION = -2;
ReentrantLock 中的 newCondition 等 Condition 相關方法正是基於 AQS 中的實現的,讓咱們先大體瞭解一波做用和用法線程
Condition 相似於 Object 中的 wait 和 notify ,主要用於線程間通訊,最大的優點是 Object 的 wait 是把線程放到當前對象的等待池中,也就是說一個對象只能有一個等待條件,而 Condition 能夠支持多個等待條件,舉個例子,商品要等至少三我的預約了纔開始發售,第一個預約的減500,第二三兩個減100。正式發售以後恢復原價。
public class ReentrantLockConditionDemo { private final ReentrantLock reentrantLock = new ReentrantLock(); private final Condition wait1 = reentrantLock.newCondition(); private final Condition wait2 = reentrantLock.newCondition(); private int wait1Count = 0; private int wait2Count = 0; public void buy() { int price = 999; reentrantLock.lock(); try { while (wait1Count++ < 1) { System.out.println(Thread.currentThread().getName() + "減500"); wait1.await(); price -= 500; } wait1.signal(); while (wait2Count++ < 2) { System.out.println(Thread.currentThread().getName() + "減100"); wait2.await(); price -= 100; } wait2.signal(); System.out.println(Thread.currentThread().getName() + "到手價" + price); } catch (InterruptedException e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } } public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); ReentrantLockConditionDemo reentrantLockConditionDemo = new ReentrantLockConditionDemo(); IntStream.rangeClosed(0, 4) .forEach(num -> executorService .execute(reentrantLockConditionDemo::buy) ); } /** * 輸出: * * pool-1-thread-1減500 * pool-1-thread-2減100 * pool-1-thread-3減100 * pool-1-thread-4到手價999 * pool-1-thread-5到手價999 * pool-1-thread-1到手價499 * pool-1-thread-2到手價899 * pool-1-thread-3到手價899 */ }
先來看條件的建立,須要基於鎖對象使用 newCondition 去建立
public Condition newCondition() { return sync.newCondition(); } final ConditionObject newCondition() { // ConditionObject 是 AQS 中對 Condition 的實現 return new ConditionObject(); }
上一篇文章中介紹了 Node 結構,這裏條件也使用了這個節點定義了一個單鏈表,統稱爲條件隊列,上一篇介紹統稱同步隊列。條件隊列結構至關簡單就不單獨畫圖了。
// 條件隊列頭 private transient Node firstWaiter; // 條件隊列尾 private transient Node lastWaiter; // 由於默認感知中斷,須要考慮如何處理 // 退出條件隊列時從新設置中斷位 private static final int REINTERRUPT = 1; // 退出條件隊列時直接拋異常 private static final int THROW_IE = -1;
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 到條件隊列中排隊,下文詳解 Node node = addConditionWaiter(); // 此方法比較簡單,就是調用前一篇講過的 release 方法釋放鎖(調用 await 時一定是鎖的持有者) // savedState 是進入條件隊列前,持有鎖的數量 // 失敗會直接拋出異常,而且最終把節點狀態設置爲 CANCELLED int savedState = fullyRelease(node); int interruptMode = 0; // 判斷在不在同步隊列(當調用signal以後會從條件隊列移到同步隊列),此判斷很簡單:節點狀態是 CONDITION 確定 false,不然就到同步隊列中去找 while (!isOnSyncQueue(node)) { // 掛起 LockSupport.park(this); // 檢查是否是由於中斷被喚醒的,下文詳解 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 上一篇介紹過acquireQueued自旋搶鎖,若是搶到鎖了,而且中斷模式不是 -1(默認0),就記錄中斷模式爲1,表示須要從新設置中斷 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除條件隊列中取消的節點 if (node.nextWaiter != null) // 下文詳解,在addConditionWaiter方法中也有用到 unlinkCancelledWaiters(); // 處理中斷 if (interruptMode != 0) // 1:再次中斷 -1:拋出異常 reportInterruptAfterWait(interruptMode); }
加入條件隊列
private Node addConditionWaiter() { Node t = lastWaiter; // 若是條件隊列最後一個節點取消了,就清理 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 新建一個 waitStatus = -2 的節點 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 下面是簡單的單鏈表操做,以前同步隊列入隊用的 CAS 操做,由於會有不少線程去搶鎖,而線程進入條件隊列必定是拿到鎖了,不知足條件了,因此不存在併發問題 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
private void unlinkCancelledWaiters() { Node t = firstWaiter; // 輔助變量,用於接尾巴,trail始終等於循環中當前節點t的上一個不是取消狀態的節點 Node trail = null; while (t != null) { Node next = t.nextWaiter; // 判斷當前節點有沒有取消 if (t.waitStatus != Node.CONDITION) { // 斷當前節點鏈 t.nextWaiter = null; // trail == null 說明目前條件隊列裏面全取消了 if (trail == null) // 頭節點指向第一個沒取消的節點 firstWaiter = next; else // trail 是 t 的前一個節點,也就是踢出了 t trail.nextWaiter = next; // 若是最後一個節點取消了,那須要改一下尾指針 if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
上文 await 方法中,線程一旦喚醒會先檢查中斷
private int checkInterruptWhileWaiting(Node node) { // 沒中斷,返回0,中斷了須要放回同步隊列 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
// 若是 final boolean transferAfterCancelledWait(Node node) { // 把由於中斷醒來的節點,設置狀態爲全新的節點,從條件隊列放入同步隊列 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } // 上面改狀態爲何要 CAS ? 若是中斷喚醒的同時被 signal 喚醒了,在 signal 入隊成功以前讓出cpu,可是不釋放鎖 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
單個喚醒和喚醒因此掉的方法相似,看一個單個喚醒流程就可
public final void signal() { // 若是持有鎖的線程不是當前線程就拋異常,也就是隻有得到鎖的線程能夠執行喚醒操做 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 通知條件隊列中的第一個節點,也就是等的最久的節點 if (first != null) doSignal(first); }
private void doSignal(Node first) { do { // 把 first 斷鏈 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 若是轉移到同步隊列失敗了,而且還有條件隊列不爲空就喚醒下一個 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { // 若是節點取消了,轉移失敗 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 這裏的 p 是 node 在同步隊列裏的前驅節點 Node p = enq(node); int ws = p.waitStatus; // 看過上一篇文章應該有映像,只要是進同步隊列,都須要把前一個節點狀態設爲 -1 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 若是取消了,或者狀態設置失敗,喚醒後繼續掛起 LockSupport.unpark(node.thread); return true; }
最後按照慣例結合上面的案例,畫張圖總結下: