AQS,全稱:AbstractQueuedSynchronizer,是JDK提供的一個同步框架,內部維護着FIFO雙向隊列,即CLH同步隊列。java
AQS依賴它來完成同步狀態的管理(voliate修飾的state,用於標誌是否持有鎖)。若是獲取同步狀態state失敗時,會將當前線程及等待信息等構建成一個Node,將Node放到FIFO隊列裏,同時阻塞當前線程,當線程將同步狀態state釋放時,會把FIFO隊列中的首節的喚醒,使其獲取同步狀態state。node
不少JUC包下的鎖都是基於AQS實現的設計模式
以下腦圖:微信
Node
static final class Node { /** 共享節點 */ static final Node SHARED = new Node();
/** 獨佔節點 */ static final Node EXCLUSIVE = null;
/** 由於超時或者中斷,節點會被設置成取消狀態,被取消的節點不會參與到競爭中,會一直是取消 狀態不會改變 */ static final int CANCELLED = 1;
/** 後繼節點處於等待狀態,若是當前節點釋放了同步狀態或者被取消,會通知後繼節點,使其得以 運行 */ static final int SIGNAL = -1;
/** 節點在等待條件隊列中,節點線程等待在condition上,當其餘線程對condition調用了signal 後,該節點將會從等待隊列中進入同步隊列中,獲取同步狀態 */ static final int CONDITION = -2;
/** * 下一次共享式同步狀態獲取會無條件的傳播下去 */ static final int PROPAGATE = -3;
/** 等待狀態 */ volatile int waitStatus;
/** 前驅節點 */ volatile Node prev;
/** 後繼節點 */ volatile Node next;
/** 獲取同步狀態的線程 */ volatile Thread thread;
/** * 下一個條件隊列等待節點 */ Node nextWaiter;
final boolean isShared() { return nextWaiter == SHARED; }
final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
Node() { // Used to establish initial head or SHARED marker }
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
FIFO結構圖
獨佔式同步狀態過程
/** * 獨佔式獲取同步狀態 */public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); }}
tryAcquire
嘗試去獲取鎖,獲取成功返回true,不然返回false。該方法由繼承AQS的子類本身實現。採用了模板方法設計模式。框架
如:ReentrantLock的Sync內部類,Sync的子類:NonfairSync和編輯器
FairSyncoop
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
addWaiter
private Node addWaiter(Node mode) { // 新建Node節點 Node node = new Node(Thread.currentThread(), mode); // 嘗試快速添加尾結點 Node pred = tail; if (pred != null) { node.prev = pred; // CAS方式設置尾結點 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若是上面添加失敗,這裏循環嘗試添加,直到添加成功爲止 enq(node); return node; }
enq
private Node enq(final Node node) { // 一直for循環,直到插入Node成功爲止 for (;;) { Node t = tail; if (t == null) { // CAS設置首節點 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // CAS設置尾結點 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued
final boolean acquireQueued(final Node node, int arg) { // 操做是否成功標誌 boolean failed = true; try { // 線程中斷標誌 boolean interrupted = false; // 不斷的自旋循環 for (;;) { // 當前節點的prev節點 final Node p = node.predecessor(); // 判斷prev是不是頭結點 && 是否獲取到同步狀態 if (p == head && tryAcquire(arg)) { // 以上條件成立,將當前節點設置成頭結點 setHead(node); // 將prev節點移除隊列中 p.next = null; // help GC failed = false; return interrupted; } // 自旋過程當中,判斷當前線程是否須要阻塞 && 阻塞當前線程而且檢驗線程中斷狀態 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 取消獲取同步狀態 cancelAcquire(node); } }
shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 拿到當前節點的prev節點的等待狀態 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 若是prev的status是signal,表示當prev釋放了同步狀態或者取消了,會通知當前節 * 點,因此當前節點能夠安心的阻塞了(至關睡覺會有人叫醒他) */ return true; if (ws > 0) { /* * status > 0,表示爲取消狀態,須要將取消狀態的節點從隊列中移除 * 直到找到一個狀態不是取消的節點爲止 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 除了以上狀況,經過CAS將prev的status設置成signal */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() { // 阻塞當前線程 LockSupport.park(this); // 返回當前線程的中斷狀態 return Thread.interrupted(); }
selfInterrupt
static void selfInterrupt() { // 未獲取到同步狀態 && 線程中斷狀態是true,中斷當前線程 Thread.currentThread().interrupt(); }
釋放獨佔式同步狀態
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 若是頭結點不爲空 && 而且 頭結點狀態不爲0 // 喚醒頭結點的後繼節點 unparkSuccessor(h); return true; } return false; }
tryRelease
嘗試去釋放同步狀態,釋放成功返回true,不然返回false。該方法由繼承AQS的子類本身實現。採用了模板方法設計模式。flex
如:ReentrantLock的Sync內部類的tryRelease方法。ui
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
unparkSuccessor
private void unparkSuccessor(Node node) { /* * 獲取當前節點狀態 */ int ws = node.waitStatus;
// 若是當前節點的狀態小於0,那麼用CAS設置成0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * 獲取當前節點的後繼節點 */ Node s = node.next; // 若是後繼節點爲空 || 或者後繼節點的狀態 > 0 (爲取消狀態) if (s == null || s.waitStatus > 0) { s = null; // 從尾結點查找狀態不爲取消的可用節點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒後繼節點 LockSupport.unpark(s.thread); }
總結
在AQS中維護着一個FIFO的同步隊列,當線程獲取同步狀態失敗後,則會加入到這個CLH同步隊列的對尾並一直保持着自旋。在CLH同步隊列中的線程在自旋時會判斷其前驅節點是否爲首節點,若是爲首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步隊列。當線程執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。
this
共享式同步狀態過程
共享式與獨佔式的最主要區別在於同一時刻獨佔式只能有一個線程獲取同步狀態,而共享式在同一時刻能夠有多個線程獲取同步狀態。例如讀操做能夠有多個線程同時進行,而寫操做同一時刻只能有一個線程進行寫操做,其餘操做都會被阻塞。
acquireShared獲取同步狀態
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) // 獲取失敗,自旋獲取同步狀態 doAcquireShared(arg); }
tryAcquireShared
嘗試去獲取共享鎖,獲取成功返回true,不然返回false。該方法由繼承AQS的子類本身實現。採用了模板方法設計模式。
如:ReentrantReadWriteLock的Sync內部類
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
doAcquireShared
private void doAcquireShared(int arg) { // 添加共享模式節點到隊列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 自旋獲取同步狀態 for (;;) { // 當前節點的前驅 final Node p = node.predecessor(); // 若是前驅節點是head節點 if (p == head) { // 嘗試去獲取共享同步狀態 int r = tryAcquireShared(arg); if (r >= 0) { // 將當前節點設置爲頭結點,而且釋放也是共享模式的後繼節點 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 真正的釋放共享同步狀態,並喚醒下一個節點 doReleaseShared(); } }
doReleaseShared
private void doReleaseShared() { // 自旋釋放共享同步狀態 for (;;) { Node h = head; // 若是頭結點不爲空 && 頭結點不等於尾結點,說明存在有效的node節點 if (h != null && h != tail) { int ws = h.waitStatus; // 若是頭結點的狀態爲signal,說明存在須要喚醒的後繼節點 if (ws == Node.SIGNAL) { // 將頭結點狀態更新爲0(初始值狀態),由於此時頭結點已經沒用了 // continue爲了保證替換成功 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒後繼節點 unparkSuccessor(h); } // 若是狀態爲初始值狀態0,那麼設置成PROPAGATE狀態 // 確保在釋放同步狀態時能通知後繼節點 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
unparkSuccessor
private void unparkSuccessor(Node node) { /* * 獲取當前節點狀態 */ int ws = node.waitStatus;
// 若是當前節點的狀態小於0,那麼用CAS設置成0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * 獲取當前節點的後繼節點 */ Node s = node.next; // 若是後繼節點爲空 || 或者後繼節點的狀態 > 0 (爲取消狀態) if (s == null || s.waitStatus > 0) { s = null; // 從尾結點查找狀態不爲取消的可用節點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒後繼節點 LockSupport.unpark(s.thread); }
熬夜到凌晨,終於搞定了這篇文章,晚安,老鐵們
給個[在看],是對IT老哥最大的支持
本文分享自微信公衆號 - IT老哥(dys_family)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。