我在Java併發包源碼學習系列:AbstractQueuedSynchronizer#同步隊列與Node節點已經粗略地介紹了一下CLH的結構,本篇主要解析該同步隊列的相關操做,所以在這邊再回顧一下:html
AQS經過內置的FIFO同步雙向隊列來完成資源獲取線程的排隊工做,內部經過節點head【其實是虛擬節點,真正的第一個線程在head.next的位置】和tail記錄隊首和隊尾元素,隊列元素類型爲Node。java
接下來將要經過AQS以獨佔式的獲取和釋放資源的具體案例來詳解內置CLH阻塞隊列的工做流程,接着往下看吧。node
public final void acquire(int arg) { if (!tryAcquire(arg) && // tryAcquire由子類實現,表示獲取鎖,若是成功,這個方法直接返回了 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 若是獲取失敗,執行 selfInterrupt(); }
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
的邏輯,咱們能夠將其進行拆分,分爲兩步:
selfInterrupt()
的邏輯,進行阻塞。接下來咱們分別來看看addWaiter
和acquireQueued
兩個方法。編程
根據傳入的mode參數決定獨佔或共享模式,爲當前線程建立節點,併入隊。多線程
// 其實就是把當前線程包裝一下,設置模式,造成節點,加入隊列 private Node addWaiter(Node mode) { // 根據mode和thread建立節點 Node node = new Node(Thread.currentThread(), mode); // 記錄一下原尾節點 Node pred = tail; // 尾節點不爲null,隊列不爲空,快速嘗試加入隊尾。 if (pred != null) { // 讓node的prev指向尾節點 node.prev = pred; // CAS操做設置node爲新的尾節點,tail = node if (compareAndSetTail(pred, node)) { // 設置成功,讓原尾節點的next指向新的node,實現雙向連接 pred.next = node; // 入隊成功,返回 return node; } } // 快速入隊失敗,進行不斷嘗試 enq(node); return node; }
幾個注意點:併發
private Node enq(final Node node) { // 自旋,俗稱死循環,直到設置成功爲止 for (;;) { // 記錄原尾節點 Node t = tail; // 第一種狀況:隊列爲空,原先head和tail都爲null, // 經過CAS設置head爲哨兵節點,若是設置成功,tail也指向哨兵節點 if (t == null) { // Must initialize // 初始化head節點 if (compareAndSetHead(new Node())) // tail指向head,下個線程來的時候,tail就不爲null了,就走到了else分支 tail = head; // 第二種狀況:CAS設置尾節點失敗的狀況,和addWaiter同樣,只不過它在for(;;)中 } else { // 入隊,將新節點的prev指向tail node.prev = t; // CAS設置node爲尾部節點 if (compareAndSetTail(t, node)) { //原來的tail的next指向node t.next = node; return t; } } } }
enq的過程是自選設置隊尾的過程,若是設置成功,就返回。若是設置失敗,則一直嘗試設置,理念就是,我總能等待設置成功那一天。app
咱們還能夠發現,head是延遲初始化的,在第一個節點嘗試入隊的時候,head爲null,這時使用了new Node()
建立了一個不表明任何線程的節點,做爲虛擬頭節點,且咱們須要注意它的waitStatus初始化爲0,這一點對咱們以後分析有指導意義。ide
若是是CAS失敗致使重複嘗試,那就仍是讓他繼續CAS好了。oop
// 這個方法若是返回true,代碼將進入selfInterrupt() final boolean acquireQueued(final Node node, int arg) { // 注意默認爲true boolean failed = true; try { // 是否中斷 boolean interrupted = false; // 自旋,即死循環 for (;;) { // 獲得node的前驅節點 final Node p = node.predecessor(); // 咱們知道head是虛擬的頭節點,p==head表示若是node爲阻塞隊列的第一個真實節點 // 就執行tryAcquire邏輯,這裏tryAcquire也須要由子類實現 if (p == head && tryAcquire(arg)) { // tryAcquire獲取成功走到這,執行setHead出隊操做 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 走到這有兩種狀況 1.node不是第一個節點 2.tryAcquire爭奪鎖失敗了 // 這裏就判斷 若是當前線程爭鎖失敗,是否須要掛起當前這個線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 死循環退出,只有tryAcquire獲取鎖失敗的時候failed才爲true if (failed) cancelAcquire(node); } }
CLU同步隊列遵循FIFO,首節點的線程釋放同步狀態後,喚醒下一個節點。將隊首節點出隊的操做實際上就是,將head指針指向將要出隊的節點就能夠了。源碼分析
private void setHead(Node node) { // head指針指向node head = node; // 釋放資源 node.thread = null; node.prev = null; }
/** * 走到這有兩種狀況 1.node不是第一個節點 2.tryAcquire爭奪鎖失敗了 * 這裏就判斷 若是當前線程爭鎖失敗,是否須要掛起當前這個線程 * * 這裏pred是前驅節點, node就是當前節點 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驅節點的waitStatus int ws = pred.waitStatus; // 前驅節點爲SIGNAL【-1】直接返回true,表示當前節點能夠被直接掛起 if (ws == Node.SIGNAL) return true; // ws>0 CANCEL 說明前驅節點取消了排隊 if (ws > 0) { // 下面這段循環其實就是跳過全部取消的節點,找到第一個正常的節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 將該節點的後繼指向node,創建雙向鏈接 pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. * 官方說明:走到這waitStatus只能是0或propagate,默認狀況下,當有新節點入隊時,waitStatus老是爲0 * 下面用CAS操做將前驅節點的waitStatus值設置爲signal */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 返回false,接着會再進入循環,此時前驅節點爲signal,返回true return false; }
針對前驅節點的waitStatus有三種狀況:
等待狀態不會爲
Node.CONDITION
,由於它用在 ConditonObject 中
PROPAGATE表示共享模式下,前驅節點不只會喚醒後繼節點,同時也可能會喚醒後繼的後繼。
咱們能夠發現,這個方法在第一次走進來的時候是不會返回true的。緣由在於,返回true的條件時前驅節點的狀態爲SIGNAL,而第一次的時候尚未給前驅節點設置SIGNAL呢,只有在CAS設置了狀態以後,第二次進來纔會返回true。
那SIGNAL的意義究竟是什麼呢?
這裏引用:併發編程——詳解 AQS CLH 鎖 # 爲何 AQS 須要一個虛擬 head 節點
waitStatus這裏用ws簡稱,每一個節點都有ws變量,用於表示該節點的狀態。初始化的時候爲0,若是被取消爲1,signal爲-1。
若是某個節點的狀態是signal的,那麼在該節點釋放鎖的時候,它須要喚醒下一個節點。
所以,每一個節點在休眠以前,若是沒有將前驅節點的ws設置爲signal,那麼它將永遠沒法被喚醒。
所以咱們會發現上面當前驅節點的ws爲0或propagate的時候,採用cas操做將ws設置爲signal,目的就是讓上一個節點釋放鎖的時候可以通知本身。
private final boolean parkAndCheckInterrupt() { // 掛起當前線程 LockSupport.park(this); return Thread.interrupted(); }
shouldParkAfterFailedAcquire方法返回true以後,就會調用該方法,掛起當前線程。
LockSupport.park(this)
方法掛起的線程有兩種途徑被喚醒:1.被unpark() 2.被interrupt()。
須要注意這裏的Thread.interrupted()會清除中斷標記位。
上面tryAcquire獲取鎖失敗的時候,會走到這個方法。
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; // 將節點的線程置空 node.thread = null; // 跳過全部的取消的節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. // 這裏在沒有併發的狀況下,preNext和node是一致的 Node predNext = pred.next; // Can use unconditional write instead of CAS here. 能夠直接寫而不是用CAS // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 設置node節點爲取消狀態 node.waitStatus = Node.CANCELLED; // 若是node爲尾節點就CAS將pred設置爲新尾節點 if (node == tail && compareAndSetTail(node, pred)) { // 設置成功以後,CAS將pred的下一個節點置爲空 compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && // pred不是首節點 ((ws = pred.waitStatus) == Node.SIGNAL || // pred的ws爲SIGNAL 或 能夠被CAS設置爲SIGNAL (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // pred線程非空 // 保存node 的下一個節點 Node next = node.next; // node的下一個節點不是cancelled,就cas設置pred的下一個節點爲next if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 上面的狀況除外,則走到這個分支,喚醒node的下一個可喚醒節點線程 unparkSuccessor(node); } node.next = node; // help GC } }
public final boolean release(int arg) { if (tryRelease(arg)) { // 子類實現tryRelease方法 // 得到當前head Node h = head; // head不爲null而且head的等待狀態不爲0 if (h != null && h.waitStatus != 0) // 喚醒下一個能夠被喚醒的線程,不必定是next哦 unparkSuccessor(h); return true; } return false; }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; // 若是node的waitStatus<0爲signal,CAS修改成0 // 將 head 節點的 ws 改爲 0,清除信號。表示,他已經釋放過了。不能重複釋放。 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 喚醒後繼節點,可是有可能後繼節點取消了等待 即 waitStatus == 1 Node s = node.next; // 若是後繼節點爲空或者它已經放棄鎖了 if (s == null || s.waitStatus > 0) { s = null; // 從隊尾往前找,找到沒有沒取消的全部節點排在最前面的【直到t爲null或t==node才退出循環嘛】 for (Node t = tail; t != null && t != node; t = t.prev) // 若是>0表示節點被取消了,就一直向前找唄,找到以後不會return,還會一直向前 if (t.waitStatus <= 0) s = t; } // 若是後繼節點存在且沒有被取消,會走到這,直接喚醒後繼節點便可 if (s != null) LockSupport.unpark(s.thread); }