一、什麼是aqshtml
aqs是一個FIFO的雙向鏈表隊列。aqs將等待獲取鎖的線程封裝成結點,放在隊列中。node
咱們能夠將aqs的做用理解爲在多線程的環境下保證線程等待獲取鎖(添加進入隊列)以及線程獲取鎖,並隊列中出去都是線程安全的。數組
更簡單的能夠理解爲aqs爲了保證在多線程的環境下入隊列和出隊列的線程安全性提供了一個基本功能框架。安全
二、aqs是如何作到線程安全的多線程
aqs主要是經過cas + 死循環以及state狀態值,來作到線程安全。app
三、aqs爲何會被設計爲FIFO雙向鏈表隊列(如下是我的理解)
①aqs的鎖實現,包含公平鎖和非公平鎖。爲了實現公平鎖,必須使用隊列來保證獲取鎖的順序(入隊列的順序)框架
②用鏈表的方式,主要是由於,操做更可能是刪除與增長。鏈表時間複雜度O(1)的效率會比數組O(n)的低。學習
③用雙向隊列的緣由是,aqs的設計思想,或則說爲了解決羊羣效應(爲了爭奪鎖,大量線程同時被喚醒)。每一個結點(線程)只須要關心本身的前一個結點的狀態(後續會說),線程喚醒也只喚醒隊頭等待線程。ui
請參考 http://www.importnew.com/2400...this
四、aqs是如何提供一個基礎框架的
aqs 經過模板設計進行提供的,實現類只需實現特定的方法便可。
如下是aqs的模板方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 。。。 其餘的省略了 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
tryAcquire(int arg),tryRelease(int arg) 是咱們要實現的模板方法,固然還有分享鎖的,這裏只介紹了獨佔鎖的。
五、從源碼角度剖析aqs。aqs是如何經過雙向鏈表隊列,cas,state狀態值,以及結點狀態來保證入隊列出隊列的線程安全的!
注:如下只介紹獨佔式的不公平鎖
①aqs 如何獲取鎖?
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(arg) 內部調用了nonfairTryAcquire(int acquires)
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 鎖未被獲取 // cas(自旋) 獲取鎖,並修改state 狀態值 if (compareAndSetState(0, acquires)) { // 設置當前佔有的線程 setExclusiveOwnerThread(current); return true; } } // 重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
解釋:利用cas自旋式的獲取鎖。
②aqs 獲取鎖失敗,如何處理?
在看代碼前,先解釋一下:將當前線程包裝成Node結點,並插入同步隊列中,並用CAS形式嘗試獲取鎖,獲取失敗,則掛起當前線程(以上只是說了大概)
先看第1個方法(將當前線程包裝成Node結點,並插入同步隊列)
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { // 尾節點不爲空 node.prev = pred; // 用 CAS 將當前線程插入隊尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 尾節點爲空,說明當前隊列仍是空的,須要初始化 enq(node); return node; } private Node enq(final Node node) { // 死循環 for (;;) { // 初始化 Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { // 這裏主要是擔憂有多個線程同時進到enq(final Node node) 方法 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
解釋:隊列若爲空,先初始化,不爲空,用 CAS 將當前結點插入到隊尾
再看第二個方法final boolean acquireQueued(final Node node, int arg);
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 獲取前置結點 final Node p = node.predecessor(); // 前置結點是頭結點,則嘗試獲取鎖 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 前置結點不是頭結點 或者 前置結點是頭結點可是嘗試獲取鎖失敗 // 則,應當將當前線程掛起(畢竟不能一直死循環獲取吧~) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 當前線程的前置節點的狀態!!! // 第waitStatus 初始化值爲0, // 也所以當第1次進到這個方法時,會將前置結點的狀態置爲 Node.SIGNAL。 // 第 2次進來的時候,前置節點的waitStatus的狀態就爲 Node.SIGNAL)。 // 也就是說。aqs 只會讓你嘗試2次,都失敗後,就會被掛起 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // 線程被掛起調用該方法!! private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
讓咱們總結一下,以及再回顧一下,爲何aqs會被設計爲雙向鏈表隊列。
aqs爲了保證結點(即線程)的入隊列的安全。採用了CAS 以及死循環的方式(從代碼中可看到,到處使用CAS)。
上面有說到,一個線程是否該被喚醒或者其餘操做,只須要看前置結點的狀態便可。從shouldParkAfterFailedAcquire() 方法就能夠看出這個設計。當前線程該作什麼操做,是看前置結點的狀態的。
③aqs如何釋放鎖
看代碼前,先解釋一下,aqs是如何作的。aqs的作法就是,釋放當前鎖,而後喚醒頭結點的後繼結點,若是後繼結點爲空,或者是被取消的,則從尾節點向前尋找一個未被取消的結點
public final boolean release(int arg) { // 嘗試釋放鎖 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 喚醒後繼結點 unparkSuccessor(h); return true; } return false; }
①ReentratLock 是如何實現鎖的釋放的
注:這裏看的是ReentrantLock的實現
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
解釋:設置 state 的狀態,若是 state == 0, 那麼說明鎖被釋放了。不然鎖還未被釋放(鎖重入!)
②aqs 如何喚醒其餘結點
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ // 清除狀態,還記得等待的線程會把前置節點的狀態置爲 Node.SIGNAL(-1)嗎 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 正常狀況下,下一個結點就是被喚醒的節點。 // 可是若是下一個結點爲null, 或者是被取消的 // 那麼從尾節點向前查找一個未被取消的節點喚醒。 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒 LockSupport.unpark(s.thread); }
release的釋放比較簡單。仍是能夠看到,aqs被設計成雙向鏈表隊列的好處!!!
看源代碼,不能一會兒就扎進去看,要先明白個大概,爲何看源代碼?還不是爲了學習做者是如何設計的。細節不管誰都記不清,最主要的是知道一個總體的流程,關鍵的代碼!畢竟優秀的開源項目這麼多,難道每行代碼都看??