AbstractQueuedSynchronizer,簡稱AQS。AQS定義了一個抽象的隊列來進行同步操做,不少同步類都依賴於它,例如經常使用的ReentrantLock/Semaphore/CountDownLatch等node
每一個node維護了一份volatile int state(表明共享狀態)和一個FIFO線程隊列(多線程爭用資源阻塞時進入該隊列),AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。安全
不一樣的自定義同步器爭用共享資源的方式也不一樣。自定義同步器在實現時只須要實現共享資源state的獲取與釋放方式便可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現如下幾種方法:微信
//嘗試獲取獨佔模式 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //嘗試釋放獨佔模式 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } //共享式獲取同步狀態 //返回負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //共享式釋放同步狀態;若是釋放後容許喚醒後續等待結點返回true,不然返回false。 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } //當前同步器是否在獨佔模式下被線程佔用,通常該方法表示是否被當前線程所獨佔;只有用到condition才須要去實現它。 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
CLH同步隊列是一個FIFO雙向隊列,AQS依賴它來完成同步狀態的管理,當前線程若是獲取同步狀態失敗時,AQS則會將當前線程已經等待狀態等信息構形成一個節點(Node)並將其加入到CLH同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。多線程
在CLH同步隊列中,一個節點表示一個線程,它保存着線程的引用(thread)、狀態(waitStatus)、前驅節點(prev)、後繼節點(next),其定義以下:app
static final class Node { //共享模式 static final Node SHARED = new Node(); //獨佔模式 static final Node EXCLUSIVE = null; //由於超時或者中斷,節點會被設置爲取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變爲其餘狀態; static final int CANCELLED = 1; //後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行 static final int SIGNAL = -1; //節點在等待隊列中,節點線程等待在Condition上,當其餘線程對Condition調用了signal()後,改節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中 static final int CONDITION = -2; //表示下一次共享式同步狀態獲取將會無條件地傳播下去 static final int PROPAGATE = -3; //等待狀態 volatile int waitStatus; //前驅節點 volatile Node prev; //後繼節點 volatile Node next; //當前節點的線程 volatile Thread thread; }
該方法以獨模式獲取共享資源。若是獲取到資源,線程直接返回,不然進入等待隊列,直到獲取到資源爲止,且整個過程忽略中斷的影響。ReentrantLock的lock方法就是調用的該方法來獲取鎖。oop
方法的執行流程以下:ui
若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt()。this
/** * 獨佔模式獲取同步狀態,若是當前線程獲取同步狀態成功,則直接返回,不然 * 將會進入同步隊列等待,該方法會調用實現類重寫的tryAcquire(int arg)方法 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
doc翻譯:嘗試以獨佔模式獲取。 若是對象的狀態容許以獨佔模式獲取它,則此方法應查詢,若是是,則獲取它。spa
執行acquire的線程始終調用此方法。 若是此方法報告失敗,則獲取方法能夠對線程進行排隊(若是它還沒有排隊),直到它經過某個其餘線程的釋放來發出信號。 這可用於實現方法{@link Lock#tryLock()}。線程
自我理解:這個方法是須要實現類進行重寫的,用於對資源的獲取和釋放。至於能不能重入,能不能加鎖,那就看具體的自定義同步器怎麼去設計了。固然,自定義同步器在進行資源訪問時要考慮線程安全的影響。
doc翻譯:爲當前線程和給定模式建立並排隊節點。
自我理解:CLH隊列入列無非就是tail指向新節點、新節點的prev指向當前最後的節點,當前最後一個節點的next指向當前節點。代碼咱們能夠看看addWaiter(Node node)方法
/** * 將當前線程加入到等待隊列的隊尾,並返回當前線程所在的結點 */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 首先嚐試在鏈表的後面快速添加節點 Node pred = tail; if (pred != null) { node.prev = pred; // 將該節點添加到隊列尾部 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若是首節點爲空或者cas添加失敗,則進入enq方法經過自旋方式入隊列,確保必定成功,這是一個保底機制 enq(node); return node; }
doc翻譯:將節點插入隊列,必要時進行初始化
自我理解:addWaiter(Node node)先經過快速嘗試設置尾節點,若是失敗,則調用enq(Node node)方法設置尾節點。在enq(Node node)方法中,AQS經過自旋鎖的方式來保證節點能夠正確添加,只有成功添加後,當前線程纔會從該方法返回,不然會一直執行下去
/** * 將node加入隊尾 */ private Node enq(final Node node) { // 自旋 for (;;) { Node t = tail; // 當前沒有節點,構造一個new Node(),將head和tail指向它 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { // 當前有節點,將傳入的Node放在鏈表的最後 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
doc翻譯:對於已經在隊列中的線程,以獨佔不間斷模式獲取。 由條件等待方法使用以及獲取。
自我理解:經過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部了。下一步須要處理的是:進入等待狀態休息,直到其餘線程完全釋放資源後喚醒本身,本身再拿到資源,而後就能夠去幹本身想幹的事了。其實就是個排隊拿號,在等待隊列中排隊拿號,直到拿到號後再返回
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 標記等待過程當中是否被中斷過 for (;;) { final Node p = node.predecessor(); // node的前一個節點 // 若是前一個節點是head,說明當前node節點是第二個節點,接着嘗試去獲取資源 // 多是head釋放完資源喚醒本身的,固然也可能被interrupt了 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; // 返回等待過程當中是否被中斷過 } // 若是本身能夠休息了,就進入waiting狀態,直到被unpark() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; // 若是等待過程當中被中斷過,哪怕只有那麼一次,就將interrupted標記爲true } } finally { if (failed) cancelAcquire(node); } }
doc翻譯:檢查並更新沒法獲取的節點的狀態。 若是線程應該阻塞,則返回true。 這是全部獲取循環中的主要信號控制。 須要pred == node.prev。
自我理解:
此方法主要用於檢查狀態,看看本身是否真的能夠去休息了
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 若是已經告訴前驅拿完號後通知本身一下,那就能夠一邊玩蛋去了 return true; if (ws > 0) { /* * 若是前節點放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後邊。 * 注意:那些放棄的結點,因爲被本身「加塞」到它們前邊,它們至關於造成一個無引用鏈,稍後就會被GC回收 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 若是前節點正常,那就把前節點的狀態設置成SIGNAL,告訴它拿完號後通知下。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
/** * 讓線程去休息,真正進入等待狀態 */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 調用park()使線程進入waiting狀態 return Thread.interrupted(); // 若是被喚醒,查看是否被中斷(該方法會重置標識位) }
acquireQueued總共作了3件事:
上一張流程圖看看吧
此方法是獨佔模式下線程釋放資源的頂層入口。它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源
/** * 釋放資源 */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 喚醒等待隊列裏的下一個線程 return true; } return false; }
跟tryAcquire()同樣,這個方法是須要獨佔模式的自定義同步器去實現的。正常來講,tryRelease()都會成功的,由於這是獨佔模式,該線程來釋放資源,那麼它確定已經拿到獨佔資源了,直接減掉相應量的資源便可(state-=arg),也不須要考慮線程安全的問題。但要注意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!因此自義定同步器在實現時,若是已經完全釋放資源(state=0),要返回true,不然返回false。
private void unparkSuccessor(Node node) { // 這裏,node通常爲當前線程所在的結點。 int ws = node.waitStatus; if (ws < 0) // 置零當前線程所在的結點狀態,容許失敗。 compareAndSetWaitStatus(node, ws, 0); // 找到下一個須要喚醒的結點s Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); // 喚醒 }
在AQS中維護着一個FIFO的同步隊列,當線程獲取同步狀態失敗後,則會加入到這個CLH同步隊列的對尾並一直保持着自旋。在CLH同步隊列中的線程在自旋時會判斷其前驅節點是否爲首節點,若是爲首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步隊列。當線程執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。
doc翻譯:以共享模式獲取,忽略中斷。 經過首先調用{@link #tryAcquireShared}來實現,成功返回。 不然線程排隊,可能反覆阻塞和解除阻塞,調用{@link #tryAcquireShared}直到成功。
簡單點說就是這個方法會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源爲止,整個過程忽略中斷
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared()依然須要自定義實現類去實現。可是AQS已經把其返回值的語義定義好了:負值表明獲取失敗;0表明獲取成功,但沒有剩餘資源;正數表示獲取成功,還有剩餘資源,其餘線程還能夠去獲取。
//共享式獲取同步狀態 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
doc翻譯:以共享不間斷模式獲取
此方法用於將當前線程加入等待隊列尾部休息,直到其餘線程釋放資源喚醒本身,本身成功拿到相應量的資源後才返回。
private void doAcquireShared(int arg) { //隊列尾部添加共享模式的節點 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { //獲取上一個節點,若是上一個節點時head,嘗試獲取資源 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r);//成功有剩餘資源,將head指向本身,喚醒以後的線程 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
設置隊列頭,並檢查後繼者是否在共享模式下等待,若是是傳播,若是傳播> 0或PROPAGATE狀態已設置。
這個方法除了從新標記head指向的節點外,還有一個重要的做用,那就是propagate(傳遞),
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
共享模式的釋放操做 - 發出後續信號並確保傳播。 (注意:對於獨佔模式,若是須要信號,只需調用數量來調用head的unparkSuccessor。)
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
以共享模式發佈。, 若是{@link #tryReleaseShared}返回true,則經過解除阻塞一個或多個線程來實現。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
掃描二維碼關注「熊英的小屋」
這裏永遠有一個位置爲你開放