本文將主要講述 AbstractQueuedSynchronizer
的內部結構和實現邏輯,在看本文以前最好先了解一下 CLH
隊列鎖,AbstractQueuedSynchronizer
就是根據 CLH
隊列鎖的變種實現的,由於自己 AQS
比較複雜不容易看清楚他自己的實現邏輯,因此查看 CLH
隊列鎖的實現,能夠幫助咱們理清楚他內部的關係;關於隊列鎖的內容能夠參考 ,CLH、MCS 隊列鎖簡介 ;html
在 JDK 中除 synchronized
內置鎖外,其餘的鎖和同步組件,基本能夠分爲:java
而 AbstractQueuedSynchronizer
即同步隊列則是 Doug Lea 大神爲咱們提供的底層線程調度的封裝;AQS
自己是根據 CLH
隊列鎖實現的,這一點在註釋中有詳細的介紹,CLH、MCS 隊列鎖簡介 ;node
簡單來說,CLH
隊列鎖就是一個單項鍊表,想要獲取鎖的線程封裝爲節點添加到尾部,而後阻塞檢查前任節點的狀態 (必定要注意是前任節點,由於這樣更容易實現取消、超時等功能,同時這也是選擇 CLH 隊列鎖的緣由),而頭結點則是當前已經得到鎖的線程,其主要做用是通知後繼節點(也就是說在沒有發生競爭的狀況下,是不須要頭結點的,這一點後面會詳細分析);編程
而對於 AQS
的結構大體能夠表述爲:併發
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected AbstractQueuedSynchronizer() { } private transient volatile Node head; // 懶加載,只有在發生競爭的時候纔會初始化; private transient volatile Node tail; // 一樣懶加載; private volatile int state; // 自定義的鎖狀態,能夠用來表示鎖的個數,以實現互斥鎖和共享鎖; }
這裏的能夠直觀的看到鏈表結構的變化,其實next鏈表只是至關於遍歷的優化,而node節點的變化纔是主要的更新;ide
static final class Node { static final Node SHARED = new Node(); // 共享模式 static final Node EXCLUSIVE = null; // 互斥模式 static final int CANCELLED = 1; // 表示線程取消獲取鎖 static final int SIGNAL = -1; // 表示後繼節點須要被喚醒 static final int CONDITION = -2; // 表示線程位於條件隊列 static final int PROPAGATE = -3; // 共享模式下節點的最終狀態,確保在doReleaseShared的時候將共享狀態繼續傳播下去 /** * 節點狀態(初始爲0,使用CAS原則更新) * 互斥模式:0,SIGNAL,CANCELLED * 共享模式:0,SIGNAL,CANCELLED,PROPAGATE * 條件隊列:CONDITION */ volatile int waitStatus; volatile Node prev; // 前繼節點 volatile Node next; // 後繼節點 volatile Thread thread; // 取鎖線程 Node nextWaiter; // 模式標識,取值:SHARED、EXCLUSIVE // Used by addWaiter,用於添加同隊隊列 Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } // Used by Condition,同於添加條件隊列 Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
根據上面的代碼和註釋已經能夠看到 AQS
爲咱們提供了兩種模式,獨佔模式和共享模式(彼此獨立能夠同時使用);其中:oop
AbstractQueuedSynchronizer.state
: 表示鎖的資源狀態,是咱們上面所說的面向用戶邏輯的部分;Node.waitStatus
: 表示節點在隊列中的狀態,是面向底層線程調度的部分;這兩個變量必定要分清楚,在後面的代碼中也很容易弄混;源碼分析
AQS 的運行邏輯能夠簡單表述爲:優化
若是你熟悉 synchronized
,應該已經發現他們的運行邏輯實際上是差很少的,都用同步隊列和條件隊列,值得注意的是這裏的條件隊列和 Condition
一一對應,可能有多個;根據上圖能夠將 AQS
提供的功能總結爲:ui
由於獨佔模式和共享模式彼此獨立能夠同時使用,因此在入隊的時候須要首先指定 Node
的類型,同時入隊的時候有競爭的可能,因此須要 CAS 入隊;
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // SHARED、EXCLUSIVE // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
代碼中註釋也說明了,此處快速嘗試入隊,是一種優化手段,由於就通常狀況而言大多數時候是沒有競爭的;失敗後在循環入隊;
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) // 此時head和tail才初始化 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
而對於出隊則稍微複雜一點,獨佔模式下直接出隊,由於沒有競爭;共享模式下,則須要 CAS 設置頭結點,由於可能對有多個節點同時出隊,同時還須要向後傳播狀態,保證後面的線程能夠及時得到鎖;此外還可能發生中斷或者異常出隊,此時則須要考慮頭尾的狀況,保證不會影響隊列的結構;具體內容將會在源碼中一次講解;
public class Mutex implements Lock { private final Sync sync = new Sync(); private static final int lock = 1; private static final int unlock = 0; @Override public void lock() { sync.acquire(lock); } @Override public boolean tryLock() { return sync.tryAcquire(lock); } @Override public void unlock() { sync.release(unlock); } private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean isHeldExclusively() { return getState() == lock; } @Override public boolean tryAcquire(int acquires) { if (compareAndSetState(unlock, lock)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int releases) { if (getState() == unlock) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(unlock); return true; } } }
注意代碼中特地將 AbstractQueuedSynchronizer.state
取值定爲lock\unlock
,主要是便於理解 state
的含義,在互斥鎖中能夠任意取值,固然也能夠是負數,可是通常狀況下令其表示爲鎖的資源數量(也就是0、1)和共享模式對比,比較容易理解;
對於獨佔模式取鎖而言有一共有四中方式,
tryAcquire
也就決定了,這個鎖時公平鎖/非公平鎖,可重入鎖/不重衝入鎖等;(好比上面的實例就是不可重入非公平鎖,具體分析之後還會詳細講解)流程圖:
源碼分析:
public final void acquire(int arg) { if (!tryAcquire(arg) && // 首先嚐試快速獲取鎖 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 失敗後入隊,而後阻塞獲取 selfInterrupt(); // 最後若是取鎖的有中斷,則從新設置中斷 }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 只要取鎖過程當中有一次中斷,返回時都要從新設置中斷 for (;;) { final Node p = node.predecessor(); // 一直阻塞到前繼節點爲頭結點 if (p == head && tryAcquire(arg)) { // 獲取同步狀態 setHead(node); // 設置頭結點,此時頭部不存在競爭,直接設置 // next 主要起優化做用,而且在入隊的時候next不是CAS設置 // 也就是經過next不必定能夠準確取到後繼節點,因此在喚醒的時候不能依賴next,須要反向遍歷 p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && // 判斷並整理前繼節點 parkAndCheckInterrupt()) // 當循環最多第二次的時候,必然阻塞 interrupted = true; } } finally { if (failed) // 異常時取消獲取 cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { // 大於0說明,前繼節點異常或者取消獲取,直接跳過; do { node.prev = pred = pred.prev; // 跳過pred並創建鏈接 } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 標記後繼節點須要喚醒 } return false; }
其中 node.prev = pred = pred.prev;
相關的內存分析能夠查看 JAVA 連等賦值問題;
流程圖:
源碼分析:
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 中斷退出 if (!tryAcquire(arg)) // 獲取同步狀態 doAcquireInterruptibly(arg); // 中斷獲取 }
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); // 加入隊尾 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && // 判斷並整理前繼節點 parkAndCheckInterrupt()) // 等待 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
流程圖:
源碼分析:
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; // 超時退出 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
釋放鎖時,判斷有後繼節點須要喚醒,則喚醒後繼節點,而後退出;有喚醒的後繼節點從新設置頭結點,並標記狀態
public final boolean release(int arg) { if (tryRelease(arg)) { // 由用戶重寫,嘗試釋放 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 喚醒後繼節點 return true; } return false; }
public class ShareLock implements Lock { private Syn sync; public ShareLock(int count) { this.sync = new Syn(count); } @Override public void lock() { sync.acquireShared(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquireShared(1) >= 0; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.releaseShared(1); } @Override public Condition newCondition() { throw new UnsupportedOperationException(); } private static final class Syn extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 5854536238831876527L; Syn(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero."); } setState(count); } @Override public int tryAcquireShared(int reduceCount) { for (; ; ) { int current = getState(); int newCount = current - reduceCount; //若是新的狀態小於0 則返回值,則表示沒有鎖資源,直接返回 if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } @Override public boolean tryReleaseShared(int retrunCount) { for (; ; ) { int current = getState(); int newCount = current + retrunCount; if (compareAndSetState(current, newCount)) { return true; } } } } }
上述代碼中的 AbstractQueuedSynchronizer.state
表示鎖的資源數,可是仍然是不可重入的;
一樣對於共享模式取鎖也有四中方式:
@Override public int tryAcquireShared(int reduceCount) { for (; ; ) { int current = getState(); int newCount = current - reduceCount; //若是新的狀態小於0 則返回值,則表示沒有鎖資源,直接返回 if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } }
須要注意的是 tryAcquireShared
方法是快速嘗試獲取鎖,並更新鎖狀態,若是失敗則必然鎖資源不足,返回負值;
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) // 快速獲取失敗 doAcquireShared(arg); // 阻塞獲取鎖 }
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); // 設置頭結點,並是狀況將信號傳播下去 p.next = null; // help GC if (interrupted) selfInterrupt(); // 從新設置中斷狀態 failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
// propagate 表示線程獲取鎖後,共享鎖剩餘的鎖資源 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); // propagate > 0 :表示還有剩餘的資源 // h.waitStatus < 0 : 表示後繼節點須要被喚醒 // 其他還作了不少保守判斷,確保後面的節點能及時那到鎖 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); // 喚醒後繼節點 } }
根據上面的代碼能夠看到,共享模式和獨佔模式獲取鎖的主要區別:
其他的思路和獨佔模式差很少,他家能夠本身看源碼;
一樣 tryReleaseShared
是由用戶本身重寫的,這裏須要注意的是若是不能確保釋放成功(由於共享模式釋放鎖的時候可能有競爭,因此可能失敗),則在外層 Lock
接口使用的時候,就須要額外處理;
@Override public boolean tryReleaseShared(int retrunCount) { for (; ; ) { int current = getState(); int newCount = current + retrunCount; if (compareAndSetState(current, newCount)) { return true; } } }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 嘗試取鎖成功,此時鎖資源已從新設置 doReleaseShared(); // 喚醒後繼節點 return true; } return false; }
doReleaseShared
方法必然執行兩次,
最終使得頭結點的狀態必然是 PROPAGATE
;
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
public class ConditionObject implements Condition, java.io.Serializable { private transient Node firstWaiter; private transient Node lastWaiter; ... }
如代碼所示條件隊列是一個由 Node
組成的鏈表,注意這裏的鏈表不一樣於同步隊列,是經過 nextWaiter
鏈接的,在同步隊列中 nextWaiter
用來表示獨佔和共享模式,因此區分條件隊列的方法就有兩個:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); // 添加節點到條件隊列 int savedState = fullyRelease(node); // 確保釋放鎖,並喚醒後繼節點 int interruptMode = 0; while (!isOnSyncQueue(node)) { // node 不在同步隊列中 LockSupport.park(this); // 阻塞 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); // 從頭結點一次喚醒 } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && // 將節點移動到同步節點中 (first = firstWaiter) != null); }
由於篇幅有點長了,因此條件隊列講的也就相對簡單了一點,可是大致的思路仍是講了;