每個Java工程師應該都或多或少了解過AQS,我本身也是前先後後,反反覆覆研究了好久,看了忘,忘了再看,每次都有不同的體會。此次趁着寫博客,打算從新拿出來系統的研究下它的源碼,總結成文章,便於之後複習。java
原文地址:http://www.jianshu.com/p/7144...node
AbstractQueuedSynchronizer(如下簡稱AQS)做爲java.util.concurrent包的基礎,它提供了一套完整的同步編程框架,開發人員只須要實現其中幾個簡單的方法就能自由的使用諸如獨佔,共享,條件隊列等多種同步模式。咱們經常使用的好比ReentrantLock,CountDownLatch等等基礎類庫都是基於AQS實現的,足以說明這套框架的強大之處。鑑於此,咱們開發人員更應該瞭解它的實現原理,這樣才能在使用過程當中駕輕就熟。編程
整體來講我的感受AQS的代碼很是難懂,本文就其中的獨佔鎖實現原理進行分析。併發
首先先從總體流程入手,瞭解下AQS獨佔鎖的執行邏輯,而後再一步一步深刻分析源碼。框架
獲取鎖的過程:高併發
釋放鎖過程:性能
基於上面所講的獨佔鎖獲取釋放的大體過程,咱們再來看下源碼實現邏輯:
首先來看下獲取鎖的方法acquire()學習
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
代碼雖然短,但包含的邏輯卻不少,一步一步看下:ui
//注意:該入隊方法的返回值就是新建立的節點 private Node addWaiter(Node mode) { //基於當前線程,節點類型(Node.EXCLUSIVE)建立新的節點 //因爲這裏是獨佔模式,所以節點類型就是Node.EXCLUSIVE Node node = new Node(Thread.currentThread(), mode); Node pred = tail; //這裏爲了提搞性能,首先執行一次快速入隊操做,即直接嘗試將新節點加入隊尾 if (pred != null) { node.prev = pred; //這裏根據CAS的邏輯,即便併發操做也只能有一個線程成功並返回,其他的都要執行後面的入隊操做。即enq()方法 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } //完整的入隊操做 private Node enq(final Node node) { for (;;) { Node t = tail; //若是隊列尚未初始化,則進行初始化,即建立一個空的頭節點 if (t == null) { //一樣是CAS,只有一個線程能夠初始化頭結點成功,其他的都要重複執行循環體 if (compareAndSetHead(new Node())) tail = head; } else { //新建立的節點指向隊列尾節點,毫無疑問併發狀況下這裏會有多個新建立的節點指向隊列尾節點 node.prev = t; //基於這一步的CAS,無論前一步有多少新節點都指向了尾節點,這一步只有一個能真正入隊成功,其餘的都必須從新執行循環體 if (compareAndSetTail(t, node)) { t.next = node; //該循環體惟一退出的操做,就是入隊成功(不然就要無限重試) return t; } } } }
上面的入隊操做有兩點須要說明:
1、初始化隊列的觸發條件就是當前已經有線程佔有了鎖資源,所以上面建立的空的頭節點能夠認爲就是當前佔有鎖資源的節點(雖然它並無設置任何屬性)。
2、注意整個代碼是處在一個死循環中,知道入隊成功。若是失敗了就會不斷進行重試。this
經過上面的分析,該方法入參node就是剛入隊的包含當前線程信息的節點 final boolean acquireQueued(final Node node, int arg) { //鎖資源獲取失敗標記位 boolean failed = true; try { //等待線程被中斷標記位 boolean interrupted = false; //這個循環體執行的時機包括新節點入隊和隊列中等待節點被喚醒兩個地方 for (;;) { //獲取當前節點的前置節點 final Node p = node.predecessor(); //若是前置節點就是頭結點,則嘗試獲取鎖資源 if (p == head && tryAcquire(arg)) { //當前節點得到鎖資源之後設置爲頭節點,這裏繼續理解我上面說的那句話 //頭結點就表示當前正佔有鎖資源的節點 setHead(node); p.next = null; //幫助GC //表示鎖資源成功獲取,所以把failed置爲false failed = false; //返回中斷標記,表示當前節點是被正常喚醒仍是被中斷喚醒 return interrupted; } 若是沒有獲取鎖成功,則進入掛起邏輯 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //最後會分析獲取鎖失敗處理邏輯 if (failed) cancelAcquire(node); } }
掛起邏輯是很重要的邏輯,這裏拿出來單獨分析一下,首先要注意目前爲止,咱們只是根據當前線程,節點類型建立了一個節點並加入隊列中,其餘屬性都是默認值。
//首先說明一下參數,node是當前線程的節點,pred是它的前置節點 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //獲取前置節點的waitStatus int ws = pred.waitStatus; if (ws == Node.SIGNAL) //若是前置節點的waitStatus是Node.SIGNAL則返回true,而後會執行parkAndCheckInterrupt()方法進行掛起 return true; if (ws > 0) { //由waitStatus的幾個取值能夠判斷這裏表示前置節點被取消 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //這裏咱們由當前節點的前置節點開始,一直向前找最近的一個沒有被取消的節點 //注,因爲頭結點head是經過new Node()建立,它的waitStatus爲0,所以這裏不會出現空指針問題,也就是說最多就是找到頭節點上面的循環就退出了 pred.next = node; } else { //根據waitStatus的取值限定,這裏waitStatus的值只能是0或者PROPAGATE,那麼咱們把前置節點的waitStatus設爲Node.SIGNAL而後從新進入該方法進行判斷 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
上面這個方法邏輯比較複雜,它是用來判斷當前節點是否能夠被掛起,也就是喚醒條件是否已經具有,即若是掛起了,那必定是能夠由其餘線程來喚醒的。該方法若是返回false,即掛起條件沒有完備,那就會從新執行acquireQueued方法的循環體,進行從新判斷,若是返回true,那就表示萬事俱備,能夠掛起了,就會進入parkAndCheckInterrupt()方法看下源碼:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //被喚醒以後,返回中斷標記,即若是是正常喚醒則返回false,若是是因爲中斷醒來,就返回true return Thread.interrupted(); }
看acquireQueued方法中的源碼,若是是由於中斷醒來,那麼就把中斷標記置爲true。不論是正常被喚醒仍是由與中斷醒來,都會去嘗試獲取鎖資源。若是成功則返回中斷標記,不然繼續掛起等待。
注:Thread.interrupted()方法在返回中斷標記的同時會清除中斷標記,也就是說當因爲中斷醒來而後獲取鎖成功,那麼整個acquireQueued方法就會返回true表示是由於中斷醒來,但若是中斷醒來之後沒有獲取到鎖,繼續掛起,因爲此次的中斷已經被清除了,下次若是是被正常喚醒,那麼acquireQueued方法就會返回false,表示沒有中斷。
最後咱們回到acquireQueued方法的最後一步,finally模塊。這裏是針對鎖資源獲取失敗之後作的一些善後工做,翻看上面的代碼,其實能進入這裏的就是tryAcquire()方法拋出異常,也就是說AQS框架針對開發人員本身實現的獲取鎖操做若是拋出異常,也作了妥善的處理,一塊兒來看下源碼:
//傳入的方法參數是當前獲取鎖資源失敗的節點 private void cancelAcquire(Node node) { // 若是節點不存在則直接忽略 if (node == null) return; node.thread = null; // 跳過全部已經取消的前置節點,跟上面的那段跳轉邏輯相似 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; //這個是前置節點的後繼節點,因爲上面可能的跳節點的操做,因此這裏可不必定就是當前節點,仔細想一下。^_^ Node predNext = pred.next; //把當前節點waitStatus置爲取消,這樣別的節點在處理時就會跳過該節點 node.waitStatus = Node.CANCELLED; //若是當前是尾節點,則直接刪除,即出隊 //注:這裏不用關心CAS失敗,由於即便併發致使失敗,該節點也已經被成功刪除 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) //這裏的判斷邏輯很繞,具體就是若是當前節點的前置節點不是頭節點且它後面的節點等待它喚醒(waitStatus小於0), //再加上若是當前節點的後繼節點沒有被取消就把前置節點跟後置節點進行鏈接,至關於刪除了當前節點 compareAndSetNext(pred, predNext, next); } else { //進入這裏,要麼當前節點的前置節點是頭結點,要麼前置節點的waitStatus是PROPAGATE,直接喚醒當前節點的後繼節點 unparkSuccessor(node); } node.next = node; // help GC } }
上面就是獨佔模式獲取鎖的核心源碼,確實很是難懂,很繞,就這幾個方法須要反反覆覆看不少遍,才能慢慢理解。
接下來看下釋放鎖的過程:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease()方法是用戶自定義的釋放鎖邏輯,若是成功,就判斷等待隊列中有沒有須要被喚醒的節點(waitStatus爲0表示沒有須要被喚醒的節點),一塊兒看下喚醒操做:
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) //把標記爲設置爲0,表示喚醒操做已經開始進行,提升併發環境下性能 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //若是當前節點的後繼節點爲null,或者已經被取消 if (s == null || s.waitStatus > 0) { s = null; //注意這個循環沒有break,也就是說它是從後往前找,一直找到離當前節點最近的一個等待喚醒的節點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //執行喚醒操做 if (s != null) LockSupport.unpark(s.thread); }
相比而言,鎖的釋放操做就簡單不少了,代碼也比較少。
以上就是AQS獨佔鎖的獲取與釋放過程,大體思想很簡單,就是嘗試去獲取鎖,若是失敗就加入一個隊列中掛起。釋放鎖時,若是隊列中有等待的線程就進行喚醒。但若是一步一步看源碼,會發現細節很是多,不少地方很難搞明白,我本身也是反反覆覆學習好久纔有點心得,但也不敢說已經研究通了AQS,甚至不敢說我上面的研究成果就是對的,只是寫篇文章總結一下,跟同行交流交流心得。除了獨佔鎖,後面還會產出AQS一系列的文章,包括共享鎖,條件隊列的實現原理等。