(1)條件鎖是什麼?java
(2)條件鎖適用於什麼場景?node
(3)條件鎖的await()是在其它線程signal()的時候喚醒的嗎?源碼分析
條件鎖,是指在獲取鎖以後發現當前業務場景本身沒法處理,而須要等待某個條件的出現才能夠繼續處理時使用的一種鎖。學習
好比,在阻塞隊列中,當隊列中沒有元素的時候是沒法彈出一個元素的,這時候就須要阻塞在條件notEmpty上,等待其它線程往裏面放入一個元素後,喚醒這個條件notEmpty,當前線程才能夠繼續去作「彈出一個元素」的行爲。ui
注意,這裏的條件,必須是在獲取鎖以後去等待,對應到ReentrantLock的條件鎖,就是獲取鎖以後才能調用condition.await()方法。this
在java中,條件鎖的實現都在AQS的ConditionObject類中,ConditionObject實現了Condition接口,下面咱們經過一個例子來進入到條件鎖的學習中。線程
public class ReentrantLockTest { public static void main(String[] args) throws InterruptedException { // 聲明一個重入鎖 ReentrantLock lock = new ReentrantLock(); // 聲明一個條件鎖 Condition condition = lock.newCondition(); new Thread(()->{ try { lock.lock(); // 1 try { System.out.println("before await"); // 2 // 等待條件 condition.await(); // 3 System.out.println("after await"); // 10 } finally { lock.unlock(); // 11 } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 這裏睡1000ms是爲了讓上面的線程先獲取到鎖 Thread.sleep(1000); lock.lock(); // 4 try { // 這裏睡2000ms表明這個線程執行業務須要的時間 Thread.sleep(2000); // 5 System.out.println("before signal"); // 6 // 通知條件已成立 condition.signal(); // 7 System.out.println("after signal"); // 8 } finally { lock.unlock(); // 9 } } }
上面的代碼很簡單,一個線程等待條件,另外一個線程通知條件已成立,後面的數字表明代碼實際運行的順序,若是你能把這個順序看懂基本條件鎖掌握得差很少了。指針
public class ConditionObject implements Condition, java.io.Serializable { /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; }
能夠看到條件鎖中也維護了一個隊列,爲了和AQS的隊列區分,我這裏稱爲條件隊列,firstWaiter是隊列的頭節點,lastWaiter是隊列的尾節點,它們是幹什麼的呢?接着看。code
新建一個條件鎖。接口
// ReentrantLock.newCondition() public Condition newCondition() { return sync.newCondition(); } // ReentrantLock.Sync.newCondition() final ConditionObject newCondition() { return new ConditionObject(); } // AbstractQueuedSynchronizer.ConditionObject.ConditionObject() public ConditionObject() { }
新建一個條件鎖最後就是調用的AQS中的ConditionObject類來實例化條件鎖。
condition.await()方法,代表如今要等待條件的出現。
// AbstractQueuedSynchronizer.ConditionObject.await() public final void await() throws InterruptedException { // 若是線程中斷了,拋出異常 if (Thread.interrupted()) throw new InterruptedException(); // 添加節點到Condition的隊列中,並返回該節點 Node node = addConditionWaiter(); // 徹底釋放當前線程獲取的鎖 // 由於鎖是可重入的,因此這裏要把獲取的鎖所有釋放 int savedState = fullyRelease(node); int interruptMode = 0; // 是否在同步隊列中 while (!isOnSyncQueue(node)) { // 阻塞當前線程 LockSupport.park(this); // 上面部分是調用await()時釋放本身佔有的鎖,並阻塞本身等待條件的出現 // *************************分界線************************* // // 下面部分是條件已經出現,嘗試去獲取鎖 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 嘗試獲取鎖,注意第二個參數,這是上一章分析過的方法 // 若是沒獲取到會再次阻塞(這個方法這裏就不貼出來了,有興趣的翻翻上一章的內容) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除取消的節點 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 線程中斷相關 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } // AbstractQueuedSynchronizer.ConditionObject.addConditionWaiter private Node addConditionWaiter() { Node t = lastWaiter; // 若是條件隊列的尾節點已取消,從頭節點開始清除全部已取消的節點 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); // 從新獲取尾節點 t = lastWaiter; } // 新建一個節點,它的等待狀態是CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); // 若是尾節點爲空,則把新節點賦值給頭節點(至關於初始化隊列) // 不然把新節點賦值給尾節點的nextWaiter指針 if (t == null) firstWaiter = node; else t.nextWaiter = node; // 尾節點指向新節點 lastWaiter = node; // 返回新節點 return node; } // AbstractQueuedSynchronizer.fullyRelease final int fullyRelease(Node node) { boolean failed = true; try { // 獲取狀態變量的值,重複獲取鎖,這個值會一直累加 // 因此這個值也表明着獲取鎖的次數 int savedState = getState(); // 一次性釋放全部得到的鎖 if (release(savedState)) { failed = false; // 返回獲取鎖的次數 return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } // AbstractQueuedSynchronizer.isOnSyncQueue final boolean isOnSyncQueue(Node node) { // 若是等待狀態是CONDITION,或者前一個指針爲空,返回false // 說明尚未移到AQS的隊列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 若是next指針有值,說明已經移到AQS的隊列中了 if (node.next != null) // If has successor, it must be on queue return true; // 從AQS的尾節點開始往前尋找看是否能夠找到當前節點,找到了也說明已經在AQS的隊列中了 return findNodeFromTail(node); }
這裏有幾個難理解的點:
(1)Condition的隊列和AQS的隊列不徹底同樣;
AQS的隊列頭節點是不存在任何值的,是一個虛節點; Condition的隊列頭節點是存儲着實實在在的元素值的,是真實節點。
(2)各類等待狀態(waitStatus)的變化;
首先,在條件隊列中,新建節點的初始等待狀態是CONDITION(-2); 其次,移到AQS的隊列中時等待狀態會更改成0(AQS隊列節點的初始等待狀態爲0); 而後,在AQS的隊列中若是須要阻塞,會把它上一個節點的等待狀態設置爲SIGNAL(-1); 最後,無論在Condition隊列仍是AQS隊列中,已取消的節點的等待狀態都會設置爲CANCELLED(1); 另外,後面咱們在共享鎖的時候還會講到另一種等待狀態叫PROPAGATE(-3)。
(3)類似的名稱;
AQS中下一個節點是next,上一個節點是prev; Condition中下一個節點是nextWaiter,沒有上一個節點。
若是弄明白了這幾個點,看懂上面的代碼仍是輕鬆加愉快的,若是沒弄明白,彤哥這裏指出來了,但願您回頭再看看上面的代碼。
下面總結一下await()方法的大體流程:
(1)新建一個節點加入到條件隊列中去;
(2)徹底釋放當前線程佔有的鎖;
(3)阻塞當前線程,並等待條件的出現;
(4)條件已出現(此時節點已經移到AQS的隊列中),嘗試獲取鎖;
也就是說await()方法內部實際上是先釋放鎖->等待條件->再次獲取鎖
的過程。
condition.signal()方法通知條件已經出現。
// AbstractQueuedSynchronizer.ConditionObject.signal public final void signal() { // 若是不是當前線程佔有着鎖,調用這個方法拋出異常 // 說明signal()也要在獲取鎖以後執行 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 條件隊列的頭節點 Node first = firstWaiter; // 若是有等待條件的節點,則通知它條件已成立 if (first != null) doSignal(first); } // AbstractQueuedSynchronizer.ConditionObject.doSignal private void doSignal(Node first) { do { // 移到條件隊列的頭節點日後一位 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 至關於把頭節點從隊列中出隊 first.nextWaiter = null; // 轉移節點到AQS隊列中 } while (!transferForSignal(first) && (first = firstWaiter) != null); } // AbstractQueuedSynchronizer.transferForSignal final boolean transferForSignal(Node node) { // 把節點的狀態更改成0,也就是說即將移到AQS隊列中 // 若是失敗了,說明節點已經被改爲取消狀態了 // 返回false,經過上面的循環可知會尋找下一個可用節點 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 調用AQS的入隊方法把節點移到AQS的隊列中 // 注意,這裏enq()的返回值是node的上一個節點,也就是舊尾節點 Node p = enq(node); // 上一個節點的等待狀態 int ws = p.waitStatus; // 若是上一個節點已取消了,或者更新狀態爲SIGNAL失敗(也是說明上一個節點已經取消了) // 則直接喚醒當前節點對應的線程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); // 若是更新上一個節點的等待狀態爲SIGNAL成功了 // 則返回true,這時上面的循環不成立了,退出循環,也就是隻通知了一個節點 // 此時當前節點仍是阻塞狀態 // 也就是說調用signal()的時候並不會真正喚醒一個節點 // 只是把節點從條件隊列移到AQS隊列中 return true; }
signal()方法的大體流程爲:
(1)從條件隊列的頭節點開始尋找一個非取消狀態的節點;
(2)把它從條件隊列移到AQS隊列;
(3)且只移動一個節點;
注意,這裏調用signal()方法後並不會真正喚醒一個節點,那麼,喚醒一個節點是在啥時候呢?
還記得開頭例子嗎?倒回去再好好看看,signal()方法後,最終會執行lock.unlock()方法,此時纔會真正喚醒一個節點,喚醒的這個節點若是曾經是條件節點的話又會繼續執行await()方法「分界線」下面的代碼。
結束了,仔細體會下^^
若是非要用一個圖來表示的話,我想下面這個圖能夠大體表示一下(這裏是用時序圖畫的,可是實際並不能算做一個真正的時序圖哈,瞭解就好):
(1)重入鎖是指可重複獲取的鎖,即一個線程獲取鎖以後再嘗試獲取鎖時會自動獲取鎖;
(2)在ReentrantLock中重入鎖是經過不斷累加state變量的值實現的;
(3)ReentrantLock的釋放要跟獲取匹配,即獲取了幾回也要釋放幾回;
(4)ReentrantLock默認是非公平模式,由於非公平模式效率更高;
(5)條件鎖是指爲了等待某個條件出現而使用的一種鎖;
(6)條件鎖比較經典的使用場景就是隊列爲空時阻塞在條件notEmpty上;
(7)ReentrantLock中的條件鎖是經過AQS的ConditionObject內部類實現的;
(8)await()和signal()方法都必須在獲取鎖以後釋放鎖以前使用;
(9)await()方法會新建一個節點放到條件隊列中,接着徹底釋放鎖,而後阻塞當前線程並等待條件的出現;
(10)signal()方法會尋找條件隊列中第一個可用節點移到AQS隊列中;
(11)在調用signal()方法的線程調用unlock()方法才真正喚醒阻塞在條件上的節點(此時節點已經在AQS隊列中);
(12)以後該節點會再次嘗試獲取鎖,後面的邏輯與lock()的邏輯基本一致了。
爲何java有自帶的關鍵字synchronized了還須要實現一個ReentrantLock呢?
首先,它們都是可重入鎖;
其次,它們都默認是非公平模式;
而後,...,呃,咱們下一章繼續深刻探討 ReentrantLock VS synchronized。
歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。