簡單解釋一下J.U.C,是JDK中提供的併發工具包,
java.util.concurrent
。裏面提供了不少併發編程中很經常使用的實用工具類,好比atomic原子操做、好比lock同步鎖、fork/join等。
我想以lock做爲切入點來說解AQS,畢竟同步鎖是解決線程安全問題的通用手段,也是咱們工做中用得比較多的方式。java
Lock是一個接口,方法定義以下node
void lock() // 若是鎖可用就得到鎖,若是鎖不可用就阻塞直到鎖釋放 void lockInterruptibly() // 和 lock()方法類似, 但阻塞的線程可中斷,拋出 java.lang.InterruptedException異常 boolean tryLock() // 非阻塞獲取鎖;嘗試獲取鎖,若是成功返回true boolean tryLock(long timeout, TimeUnit timeUnit) //帶有超時時間的獲取鎖方法 void unlock() // 釋放鎖
實現Lock接口的類有不少,如下爲幾個常見的鎖實現編程
讀和讀不互斥、讀和寫互斥、寫和寫互斥
。也就是說涉及到影響數據變化的操做都會存在互斥。如何在實際應用中使用ReentrantLock呢?咱們經過一個簡單的demo來演示一下api
public class Demo { private static int count=0; static Lock lock=new ReentrantLock(); public static void inc(){ lock.lock(); try { Thread.sleep(1); count++; } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } }
這段代碼主要作一件事,就是經過一個靜態的incr()
方法對共享變量count
作連續遞增,在沒有加同步鎖的狀況下多線程訪問這個方法必定會存在線程安全問題。因此用到了ReentrantLock
來實現同步鎖,而且在finally語句塊中釋放鎖。
那麼我來引出一個問題,你們思考一下安全
多個線程經過lock競爭鎖時,當競爭失敗的鎖是如何實現等待以及被喚醒的呢?
aqs全稱爲AbstractQueuedSynchronizer,它提供了一個FIFO隊列,能夠當作是一個用來實現同步鎖以及其餘涉及到同步功能的核心組件,常見的有:ReentrantLock、CountDownLatch等。
AQS是一個抽象類,主要是經過繼承的方式來使用,它自己沒有實現任何的同步接口,僅僅是定義了同步狀態的獲取以及釋放的方法來提供自定義的同步組件。
能夠這麼說,只要搞懂了AQS,那麼J.U.C中絕大部分的api都能輕鬆掌握。數據結構
從使用層面來講,AQS的功能分爲兩種:獨佔和共享多線程
仍然以ReentrantLock爲例,來分析AQS在重入鎖中的使用。畢竟單純分析AQS沒有太多的含義。先理解這個類圖,能夠方便咱們理解AQS的原理
架構
AQS的實現依賴內部的同步隊列,也就是FIFO的雙向隊列,若是當前線程競爭鎖失敗,那麼AQS會把當前線程以及等待狀態信息構形成一個Node加入到同步隊列中,同時再阻塞該線程。當獲取鎖的線程釋放鎖之後,會從隊列中喚醒一個阻塞的節點(線程)。
併發
AQS隊列內部維護的是一個FIFO的雙向鏈表,這種結構的特色是每一個數據結構都有兩個指針,分別指向直接的後繼節點和直接前驅節點。因此雙向鏈表能夠從任意一個節點開始很方便的訪問前驅和後繼。每一個Node實際上是由線程封裝,當線程爭搶鎖失敗後會封裝成Node加入到ASQ隊列中去
Node類的組成以下函數
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; //前驅節點 volatile Node next; //後繼節點 volatile Thread thread;//當前線程 Node nextWaiter; //存儲在condition隊列中的後繼節點 //是否爲共享鎖 final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } //將線程構形成一個Node,添加到等待隊列 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } //這個方法會在Condition隊列使用,後續單獨寫一篇文章分析condition Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
當出現鎖競爭以及釋放鎖的時候,AQS同步隊列中的節點會發生變化,首先看一下添加節點的場景。
這裏會涉及到兩個變化
head節點表示獲取鎖成功的節點,當頭結點在釋放同步狀態時,會喚醒後繼節點,若是後繼節點得到鎖成功,會把本身設置爲頭結點,節點的變化過程以下
這個過程也是涉及到兩個變化
這裏有一個小的變化,就是設置head節點不須要用CAS,緣由是設置head節點是由得到鎖的線程來完成的,而同步鎖只能由一個線程得到,因此不須要CAS保證,只須要把head節點設置爲原首節點的後繼節點,而且斷開原head節點的next引用便可
清楚了AQS的基本架構之後,咱們來分析一下AQS的源碼,仍然以ReentrantLock爲模型。
調用ReentrantLock中的lock()方法,源碼的調用過程我使用了時序圖來展示
從圖上能夠看出來,當鎖獲取失敗時,會調用addWaiter()方法將當前線程封裝成Node節點加入到AQS隊列,基於這個思路,咱們來分析AQS的源碼實現
public void lock() { sync.lock(); }
這個是獲取鎖的入口,調用sync這個類裏面的方法,sync是什麼呢?
abstract static class Sync extends AbstractQueuedSynchronizer
sync是一個靜態內部類,它繼承了AQS這個抽象類,前面說過AQS是一個同步工具,主要用來實現同步控制。咱們在利用這個工具的時候,會繼承它來實現同步控制功能。
經過進一步分析,發現Sync這個類有兩個具體的實現,分別是NofairSync(非公平鎖)
,FailSync(公平鎖)
.
公平鎖和非公平鎖的實現上的差別,我會在文章後面作一個解釋,接下來的分析仍然以非公平鎖
做爲主要分析邏輯。
final void lock() { if (compareAndSetState(0, 1)) //經過cas操做來修改state狀態,表示爭搶鎖的操做 setExclusiveOwnerThread(Thread.currentThread());//設置當前得到鎖狀態的線程 else acquire(1); //嘗試去獲取鎖 }
這段代碼簡單解釋一下
compareAndSetState
compareAndSetState的代碼實現邏輯以下
// See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
這段代碼其實邏輯很簡單,就是經過cas樂觀鎖的方式來作比較並替換。上面這段代碼的意思是,若是當前內存中的state的值和預期值expect相等,則替換爲update。更新成功返回true,不然返回false. 這個操做是原子的,不會出現線程安全問題,這裏面涉及到Unsafe這個類的操做,一級涉及到state這個屬性的意義。 **state**
private volatile int state;須要注意的是:不一樣的AQS實現,state所表達的含義是不同的。
Unsafe
Unsafe類是在sun.misc包下,不屬於Java標準。可是不少Java的基礎類庫,包括一些被普遍使用的高性能開發庫都是基於Unsafe類開發的,好比Netty、Hadoop、Kafka等;Unsafe可認爲是Java中留下的後門,提供了一些低層次操做,如直接內存訪問、線程調度等public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);這個是一個native方法, 第一個參數爲須要改變的對象,第二個爲偏移量(即以前求出來的headOffset的值),第三個參數爲期待的值,第四個爲更新後的值
整個方法的做用是若是當前時刻的值等於預期值var4相等,則更新爲新的指望值 var5,若是更新成功,則返回true,不然返回false;
acquire是AQS中的方法,若是CAS操做未能成功,說明state已經不爲0,此時繼續acquire(1)操做,這裏你們思考一下,acquire方法中的1的參數是用來作什麼呢?若是沒猜中,往前面回顧一下state這個概念
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這個方法的主要邏輯是
若是你們看過我寫的 Synchronized源碼分析的文章,就應該可以明白自旋存在的意義
這個方法的做用是嘗試獲取鎖,若是成功返回true,不成功返回false
它是重寫AQS類中的tryAcquire方法,而且你們仔細看一下AQS中tryAcquire方法的定義,並無實現,而是拋出異常。按照通常的思惟模式,既然是一個不實現的模版方法,那應該定義成abstract,讓子類來實現呀?你們想一想爲何
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
tryAcquire(1)在NonfairSync中的實現代碼以下
ffinal boolean nonfairTryAcquire(int acquires) { //得到當前執行的線程 final Thread current = Thread.currentThread(); int c = getState(); //得到state的值 if (c == 0) { //state=0說明當前是無鎖狀態 //經過cas操做來替換state的值改成1,你們想一想爲何要用cas呢? //理由是,在多線程環境中,直接修改state=1會存在線程安全問題,你猜到了嗎? if (compareAndSetState(0, acquires)) { //保存當前得到鎖的線程 setExclusiveOwnerThread(current); return true; } } //這段邏輯就很簡單了。若是是同一個線程來得到鎖,則直接增長重入次數 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //增長重入次數 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
當tryAcquire方法獲取鎖失敗之後,則會先調用addWaiter將當前線程封裝成Node,而後添加到AQS隊列
private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE //將當前線程封裝成Node,而且mode爲獨佔鎖 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // tail是AQS的中表示同步隊列隊尾的屬性,剛開始爲null,因此進行enq(node)方法 Node pred = tail; if (pred != null) { //tail不爲空的狀況,說明隊列中存在節點數據 node.prev = pred; //講當前線程的Node的prev節點指向tail if (compareAndSetTail(pred, node)) {//經過cas講node添加到AQS隊列 pred.next = node;//cas成功,把舊的tail的next指針指向新的tail return node; } } enq(node); //tail=null,將node添加到同步隊列中 return node; }
enq就是經過自旋操做把當前節點加入到隊列中
private Node enq(final Node node) { //自旋,不作過多解釋,不清楚的關注公衆號[架構師修煉寶典] for (;;) { Node t = tail; //若是是第一次添加到隊列,那麼tail=null if (t == null) { // Must initialize //CAS的方式建立一個空的Node做爲頭結點 if (compareAndSetHead(new Node())) //此時隊列中只一個頭結點,因此tail也指向它 tail = head; } else { //進行第二次循環時,tail不爲null,進入else區域。將當前線程的Node結點的prev指向tail,而後使用CAS將tail指向Node node.prev = t; if (compareAndSetTail(t, node)) { //t此時指向tail,因此能夠CAS成功,將tail從新指向Node。此時t爲更新前的tail的值,即指向空的頭結點,t.next=node,就將頭結點的後續結點指向Node,返回頭結點 t.next = node; return t; } } } }
假若有兩個線程t1,t2同時進入enq方法,t==null表示隊列是首次使用,須要先初始化
另一個線程cas失敗,則進入下次循環,經過cas操做將node添加到隊尾
到目前爲止,經過addwaiter方法構造了一個AQS隊列,而且將線程添加到了隊列的節點中
將添加到隊列中的Node做爲參數傳入acquireQueued方法,這裏面會作搶佔鎖的操做
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();// 獲取prev節點,若爲null即刻拋出NullPointException if (p == head && tryAcquire(arg)) {// 若是前驅爲head纔有資格進行鎖的搶奪 setHead(node); // 獲取鎖成功後就不須要再進行同步操做了,獲取鎖成功的線程做爲新的head節點 //凡是head節點,head.thread與head.prev永遠爲null, 可是head.next不爲null p.next = null; // help GC failed = false; //獲取鎖成功 return interrupted; } //若是獲取鎖失敗,則根據節點的waitStatus決定是否須要掛起線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 若前面爲true,則執行掛起,待下次喚醒的時候檢測中斷的標誌 interrupted = true; } } finally { if (failed) // 若是拋出異常則取消鎖的獲取,進行出隊(sync queue)操做 cancelAcquire(node); } }
前面的邏輯都很好理解,主要看一下shouldParkAfterFailedAcquire這個方法和parkAndCheckInterrupt的做用
從上面的分析能夠看出,只有隊列的第二個節點能夠有機會爭用鎖,若是成功獲取鎖,則此節點晉升爲頭節點。對於第三個及之後的節點,if (p == head)條件不成立,首先進行shouldParkAfterFailedAcquire(p, node)操做
shouldParkAfterFailedAcquire方法是判斷一個爭用鎖的線程是否應該被阻塞。它首先判斷一個節點的前置節點的狀態是否爲Node.SIGNAL,若是是,是說明此節點已經將狀態設置-若是鎖釋放,則應當通知它,因此它能夠安全的阻塞了,返回true。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前繼節點的狀態 if (ws == Node.SIGNAL)//若是是SIGNAL狀態,意味着當前線程須要被unpark喚醒 return true; 若是前節點的狀態大於0,即爲CANCELLED狀態時,則會從前節點開始逐步循環找到一個沒有被「CANCELLED」節點設置爲當前節點的前節點,返回false。在下次循環執行shouldParkAfterFailedAcquire時,返回true。這個操做實際是把隊列中CANCELLED的節點剔除掉。 if (ws > 0) {// 若是前繼節點是「取消」狀態,則設置 「當前節點」的 「當前前繼節點」 爲 「‘原前繼節點'的前繼節點」。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 若是前繼節點爲「0」或者「共享鎖」狀態,則設置前繼節點爲SIGNAL狀態。 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
若是shouldParkAfterFailedAcquire返回了true,則會執行:parkAndCheckInterrupt()
方法,它是經過LockSupport.park(this)將當前線程掛起到WATING狀態,它須要等待一箇中斷、unpark方法來喚醒它,經過這樣一種FIFO的機制的等待,來實現了Lock的操做。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
LockSupport
LockSupport類是Java6引入的一個類,提供了基本的線程同步原語。LockSupport其實是調用了Unsafe類裏的函數,歸結到Unsafe裏,只有兩個函數:public native void unpark(Thread jthread); public native void park(boolean isAbsolute, long time);unpark函數爲線程提供「許可(permit)」,線程調用park函數則等待「許可」。這個有點像信號量,可是這個「許可」是不能疊加的,「許可」是一次性的。
permit至關於0/1的開關,默認是0,調用一次unpark就加1變成了1.調用一次park會消費permit,又會變成0。 若是再調用一次park會阻塞,由於permit已是0了。直到permit變成1.這時調用unpark會把permit設置爲1.每一個線程都有一個相關的permit,permit最多隻有一個,重複調用unpark不會累積
加鎖的過程分析完之後,再來分析一下釋放鎖的過程,調用release方法,這個方法裏面作兩件事,1,釋放鎖 ;2,喚醒park的線程
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
這個動做能夠認爲就是一個設置鎖狀態的操做,並且是將狀態減掉傳入的參數值(參數是1),若是結果狀態爲0,就將排它鎖的Owner設置爲null,以使得其它的線程有機會進行執行。
在排它鎖中,加鎖的時候狀態會增長1(固然能夠本身修改這個值),在解鎖的時候減掉1,同一個鎖,在能夠重入後,可能會被疊加爲二、三、4這些值,只有unlock()的次數與lock()的次數對應纔會將Owner線程設置爲空,並且也只有這種狀況下才會返回true。
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 這裏是將鎖的數量減1 if (Thread.currentThread() != getExclusiveOwnerThread())// 若是釋放的線程和獲取鎖的線程不是同一個,拋出非法監視器狀態異常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 因爲重入的關係,不是每次釋放鎖c都等於0, // 直到最後一次釋放鎖時,纔會把當前線程釋放 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
在方法unparkSuccessor(Node)中,就意味着真正要釋放鎖了,它傳入的是head節點(head節點是佔用鎖的節點),當前線程被釋放以後,須要喚醒下一個節點的線程
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) {//判斷後繼節點是否爲空或者是不是取消狀態, s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) //而後從隊列尾部向前遍歷找到最前面的一個waitStatus小於0的節點, 至於爲何從尾部開始向前遍歷,由於在doAcquireInterruptibly.cancelAcquire方法的處理過程當中只設置了next的變化,沒有設置prev的變化,在最後有這樣一行代碼:node.next = node,若是這時執行了unparkSuccessor方法,而且向後遍歷的話,就成了死循環了,因此這時只有prev是穩定的 s = t; } //內部首先會發生的動做是獲取head節點的next節點,若是獲取到的節點不爲空,則直接經過:「LockSupport.unpark()」方法來釋放對應的被掛起的線程,這樣一來將會有一個節點喚醒後繼續進入循環進一步嘗試tryAcquire()方法來獲取鎖 if (s != null) LockSupport.unpark(s.thread); //釋放許可 }
經過這篇文章基本將AQS隊列的實現過程作了比較清晰的分析,主要是基於非公平鎖的獨佔鎖實現。在得到同步鎖時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中並在隊列中進行自旋;移出隊列(或中止自旋)的條件是前驅節點爲頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器調用tryRelease(int arg)方法釋放同步狀態,而後喚醒頭節點的後繼節點。