#前言java
很久沒寫blog了,最近學車,時間真的少了不少,花了一些時間寫了一篇AQS,請你們指正。node
翻閱AbstractQueuedSynchronizer的源碼,會發現以下注釋:併發
Pprovides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
AbstractQueuedSynchronizer提供一個基於FIFO隊列的框架,該框架用於實現阻塞鎖和相關同步器(例如:semaphores)。框架
如此可知,AbstractQueuedSynchronizer能夠視爲JDK同步器的框架,理解它,有助於理解JDK的同步器。ide
本人依據JDK源碼中的註釋結合併發經驗,總結了以下AQS框架說明:工具
AQS是依賴狀態進行同步操做的,其內部使用一個整形變量state,來表示同步狀態,此狀態值依據具體的同步器語義實現。例如:在CountDownLatch中state即爲須要等待的線程數。ui
AQS的子類必須定義在獲取和釋放上對應的狀態值。對於AQS狀態變量的操做必須使用getState,setState,compareAndSetState 三個原子方法。線程
AQS 提供了互斥與共享兩種模式,AbstractQueuedSynchronizer類中的final方法已完善隊列和阻塞機制,子類僅須要實現protected方法,設計
AQS的子類應該被定義爲非公共的內部助手類,用於實現它們的封閉類的同步屬性code
AQS在序列化時僅序列化狀態,在默認狀況下會獲得一個空的線程隊列。子類一般須要實現readObject方法,用來設置初始狀態。
hasQueuedPredecessors在設計公平的同步器時使用,若是該方法返回true,公平的同步器tryAcquire方法應該返回false
ConditionObject AQS的內部類,子類能夠用ConditionObject實現條件謂詞,若不須要實現條件謂詞能夠不實現。
//JDK中的源碼 public final void acquire(int state) { if (!tryAcquire(state) && acquireQueued(addWaiter(Node.EXCLUSIVE), state)) selfInterrupt(); }
其對應代碼的語義爲:
while (!獲取不成功) { 若是當前線程不在隊列中, 加入隊列 阻塞當前線程 } 即阻塞直到獲取成功。
//JDK中的源碼 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
對應代碼的語義爲:
if (嘗試釋放成功) 解鎖隊列中的第一個線程
若是當前節點爲隊列中的第一個節點,嘗試獲取,獲取成功進行head後續節點的設置。如獲取失敗維護先後節點關係,若須要阻塞進行阻塞,以後繼續重試。 若出現異常獲取失敗,取消當前節點獲取操做。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//嘗試獲取失敗 doAcquireShared(arg);//進行共享式獲取 } /** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) {//當前節點的先驅節點爲head,即當前節點爲第一個 int r = tryAcquireShared(arg); if (r >= 0) {//嘗試獲取成功 //向上冒泡,保證head節點的後驅節點爲未獲取到的節點 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //包裝爲 獲取失敗的節點 若須要中斷進行中斷 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
確保級聯釋放,即便有其餘的線程正在進行的獲取/釋放。 這個過程一般嘗試釋放head的後續節點,若是他須要被釋放。 若是該節點不須要,會向下傳遞釋放動做,直到釋放成功。 此外,咱們必須在添加新節點時進行循環處理。不一樣於其餘操做 中釋放後續節點,咱們須要知道CAS是否重置了狀態,因此咱們須要重複檢查。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { /* */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 循環檢查狀態 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // 循環檢查CAS } if (h == head) // 循環檢查是否有新節點 break; } }
在不可重入鎖Mutex中 ,咱們使用state=0表示釋放,state=1表示獲取
class Mutex implements Lock, java.io.Serializable { // 內部助手同步類Sync private static class Sync extends AbstractQueuedSynchronizer { // 當state=1表示獲取了獨佔鎖 protected boolean isHeldExclusively() { return getState() == 1; } // 若是state=0,鎖是釋放狀態,嘗試獲取 public boolean tryAcquire(int acquires) { assert acquires == 1; // acquires爲1表示進行獲取操做,其餘值無效 if (compareAndSetState(0, 1)) {//CAS操做 setExclusiveOwnerThread(Thread.currentThread());//設置鎖的持有者爲當前線程 return true; } return false; } //嘗試釋放 protected boolean tryRelease(int releases) { assert releases == 1; // 傳入的值爲1表示進行釋放,其餘值無效 if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0);//設置狀態爲0,表示鎖已釋放 return true; } // 提供一個條件謂詞 Condition newCondition() { return new ConditionObject(); } // 反序列化屬性 private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); //設置初始狀態爲釋放 } } // 全部同步操做 委託給Sync,下面咱們實現必要的鎖須要的操做 private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
state=0表示未被通知(等待中,不可共享獲取),state!=0表示被通知(可共享獲取)
class BooleanLatch { //內部同步器,state=0表示未被通知(等待中,不可共享獲取),state!=0表示被通知(可共享獲取) private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled() { return getState() != 0; } /** *tryAcquireShared 返回負值 獲取失敗 *0 獲取成功其餘線程不能獲取 *正值獲取成功,其餘線程也可獲取成功 / protected int tryAcquireShared(int ignore) { return isSignalled() ? 1 : -1; } protected boolean tryReleaseShared(int ignore) { setState(1); return true; } } private final Sync sync = new Sync(); public boolean isSignalled() { return sync.isSignalled(); } public void signal() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } }
分析JDK中的同步類,除了瞭解AQS外,還要知道每一個同步器中的state的語義是什麼,AQS上邊已經分析了,下面介紹下幾個同步器的state的語義。
ReentrantLock 只支持獨佔方式的獲取操做,它實現了tryAcquire,tryRelease和isHeldExclusively.
ReentrantLock的狀態用於存儲鎖獲取的操做次數,同一線程每獲取一次加1,每釋放一次減小1.
tryAcquire代碼簡要分析
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // 當前狀態值(即鎖獲取的操做)>0,鎖的全部者非當前線程,獲取失敗 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT)//若是狀態值飽和,獲取失敗,即超過最大可獲取線程數 throw new Error("Maximum lock count exceeded"); //符合獲取鎖的條件,更新狀態值, setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //設置鎖的持有者爲當前線程 setExclusiveOwnerThread(current); return true; }
CountDownLatch同步狀態保存當前的計數值。相似BooleanLatch,不作分析。
Semaphore的同步狀態用於存儲當前能夠許可的數量。
Semaphore中的tryAcquireShared,tryReleaseShared
tryAcquireShared,獲取當前可用許可數量,若可用許可數量大於申請數量,經過compareAndSetState設置新的剩餘許可數量,不然獲取失敗。
tryReleaseShared獲取當前可用許可數量,若是當前剩餘許可數量+釋放數量>0,過compareAndSetState設置新的剩餘許可數量,不然獲取失敗。
/** *tryAcquireShared,獲取當前可用許可數量,若可用許可數量大於申請數量,經過compareAndSetState設置新的剩餘許可數量,不然獲取失敗。 */ final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } /** * *tryReleaseShared獲取當前可用許可數量,若是當前剩餘許可數量+釋放數量>0,過compareAndSetState設置新的剩餘許可數量,不然獲取失敗。 */ protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
FutueTask的同步器狀態值以下:
NEW = 0; //初始狀態 COMPLETING = 1; //運行中 NORMAL = 2; //完成 EXCEPTIONAL = 3; //異常 CANCELLED = 4; //已取消 INTERRUPTING = 5; //中斷中 INTERRUPTED = 6; //已中斷
可能的狀態轉換
NEW(初始狀態) -> COMPLETING(運行中) -> NORMAL (已完成)
NEW(初始狀態) -> COMPLETING(運行中) -> EXCEPTIONAL (異常)
NEW (初始狀態)-> CANCELLED (已取消)
NEW (初始狀態)-> INTERRUPTING (中斷中)-> INTERRUPTED (已中斷)
Future.get的語義很是相似閉鎖,若是發生了某件事件(由FutureTask表示的任務執行完成 或者取消),那麼線程能夠恢復執行,不然一致阻塞。
AQS是JDK併發的框架,仔細理解有助於理解JDK的同步工具。 對於JDK的部分同步類,進行了簡要說明,詳細自行查閱源碼。 對於JDK同步類的源碼建議進行以下步驟: 1.理解同步器的狀態值的語義 2.該同步器使用是AQS的什麼模式, 是共享,互斥,仍是共享與互斥都有。 3.優先理解同步器的tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared方法,以後查看其它方法。