在以前咱們已經對部分JDK
源碼作了介紹:java
啃碎JDK源碼(一):String
啃碎JDK源碼(二):Integer
啃碎JDK源碼(三):ArrayList
啃碎JDK源碼(四):HashMap
啃碎JDK源碼(五):ConcurrentHashMap
啃碎JDK源碼(六):LinkedListnode
今天咱們正式開始介紹juc
包下面的類,也就是和多線程打交道的地方,和鎖打交道的類用的比較的的無非就是 ReentrantLock
和 ReentrantReadWriteLock
等,可是咱們今天要介紹的是 AbstractQueuedSynchronizer
這個抽象類,也就是面試中常常被問到的 AQS
,由於不論是ReentrantLock
仍是ReentrantReadWriteLock
以及其它的一些都是基於它實現的,因此頗有必要先來了解一下。面試
AQS的全稱爲(AbstractQueuedSynchronizer),咱們能夠把它當作一個幫助咱們實現鎖的同步器,它基於FIFO(先進先出)的隊列實現的,而且內部維護了一個狀態變量 state
,經過原子更新這個狀態變量便可以實現加鎖解鎖操做。segmentfault
來看下 AbstractQueuedSynchronizer
的繼承結構多線程
能看到 ReentrantLock
等並非直接繼承 AbstractQueuedSynchronizer
,而是其內部類 Sync
。併發
接着來看看一些重要的屬性:ide
// 隊列的頭節點 private transient volatile Node head; // 隊列的尾節點 private transient volatile Node tail; // 控制加鎖解鎖的狀態變量 private volatile int state;
定義了一個狀態變量和一個隊列,狀態變量用來控制加鎖解鎖,隊列用來放置等待的線程。oop
這個 state
變量很重要,用來作狀態標識。比方說在 ReentrantLock
裏面它表示獲取鎖的線程數,假如等於0表示尚未線程獲取鎖,1表示有線程獲取了鎖。大於1表示重入鎖的數量。測試
注意,這幾個變量都要使用 volatile
關鍵字來修飾,由於是在多線程環境下操做,要保證它們的值修改以後對其它線程當即可見。ui
還有咱們的 Node
內部類
static final class Node { // 標識一個節點是共享模式 static final Node SHARED = new Node(); // 標識一個節點是互斥模式 static final Node EXCLUSIVE = null; // 標識線程已取消 static final int CANCELLED = 1; // 標識後繼節點須要喚醒 static final int SIGNAL = -1; // 標識線程等待在一個條件上 static final int CONDITION = -2; // 標識後面的共享鎖須要無條件的傳播 static final int PROPAGATE = -3; // 當前節點保存的線程對應的等待狀態 volatile int waitStatus; // 上一個結點 volatile Node prev; // 下一個結點 volatile Node next; // 當前結點保存的線程 volatile Thread thread; // 下一個等待在條件上的節點(Condition鎖時使用) Node nextWaiter; // 是不是共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 獲取前一個節點 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { // 把共享模式仍是互斥模式存儲到nextWaiter這個字段裏面了 this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // 等待的狀態,在Condition中使用 this.waitStatus = waitStatus; this.thread = thread; } }
上面是一個標準的雙向鏈表結構,保存着當前線程、前一個節點、後一個節點以及線程的狀態等信息。屬性比較多,看不懂不要緊,後面用到會從新講一下。
那麼在源碼裏面是如何修改這些變量的呢?其實就是經過咱們以前說的 CAS
來修改,若是不瞭解的話請參考 一文看懂CAS
比方說 state
狀態變量的修改
// 獲取Unsafe類的實例 private static final Unsafe unsafe = Unsafe.getUnsafe(); // 狀態變量state的偏移量 private static final long stateOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); ....... } catch (Exception ex) { throw new Error(ex); } }
核心是 compareAndSetState
方法:
protected final boolean compareAndSetState(int expect, int update) { // 若是當前值等於except,那麼更新成update return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
既然 AbstractQueuedSynchronizer
是一個抽象類,那麼子類要實現哪些接口呢?
好比說用來加鎖的 tryAcquire
方法
// 互斥模式下嘗試獲取鎖 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
能夠看到只是拋出了異常,而且值得注意的是該方法並無定義成抽象方法,由於只要實現一部分方法就能夠本身手動編寫一個鎖了,定義成 protect
也是方便子類去實現,除此以外還有
// 互斥模式下嘗試釋放鎖 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // 共享模式下嘗試獲取鎖 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // 共享模式下嘗試釋放鎖 protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } // 當前線程是否持有鎖 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
如今咱們嘗試一下基於AQS
手動實現一個鎖:
/* * 實現Lock接口 */ public class MyLock implements Lock { private final Sync sync; public MyLock() { sync = new Sync(); } // 定義一個內部類Sync繼承AbstractQueuedSynchronizer private static class Sync extends AbstractQueuedSynchronizer { // 嘗試獲取獨佔鎖 @Override protected boolean tryAcquire(int arg) { // AQS方法:CAS更新state狀態變量 if (compareAndSetState(0, 1)) { // AQS方法:設置當前線程爲持有鎖的線程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 嘗試釋放獨佔鎖 @Override protected boolean tryRelease(int arg) { if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); // AQS方法:當前線程已持有鎖,能夠直接修改state值,不須要經過CAS修改 setState(0); return true; } // 鎖是否已被釋放 @Override protected boolean isHeldExclusively() { return getState() == 1; } Condition newCondition() { return new ConditionObject(); } } @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
好了,咱們的鎖已經寫完了,核心就是使用一個靜態內部類Sync
繼承AQS
,實現加鎖、釋放鎖等方法,其實咱們熟悉的 ReentrantLock
也是這樣的實現原理,如今咱們來測試一下:
public class TestLock { private final Lock lock = new MyLock(); private volatile int count = 1; private static class WorkThread extends Thread { private final TestLock myLock; public WorkThread(TestLock myLock) { this.myLock = myLock; } @Override public void run() { myLock.execute(); } } public void execute() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "獲取到的count=" + count++); } finally { lock.unlock(); } } public static void main(String[] args) { TestLock myLock = new TestLock(); // 啓動100個線程 for (int i = 1; i <= 100; i++) { new WorkThread(myLock).start(); } } }
咱們啓動100個線程對 count
加一,看看打印結果:
能夠看到最後的確是加到了100,也就是說咱們的鎖是可用的。
可是,咱們如今的鎖是不可重入鎖,學過ReentrantLock
的同窗應該知道它是可重入鎖,也就是在線程持有鎖的狀況下能夠從新得到鎖,假如咱們改一下execute
方法:
public void execute() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "獲取到的count=" + count++); if (count == 5) { execute(); } } finally { lock.unlock(); } }
當 count == 5 時執行調用自身,看下執行結果:
能夠看到線程被阻塞了,由於當前持有鎖的線程不能從新獲取鎖,因此咱們須要對 tryAcquire
和 tryRelease
方法進行改造:
// 嘗試獲取獨佔鎖 @Override protected boolean tryAcquire(int arg) { // 獲取當前線程 Thread currentThread = Thread.currentThread(); int state = getState(); if (state == 0) { // AQS方法:CAS更新state狀態變量 if (compareAndSetState(0, 1)) { // AQS方法:設置當前線程爲持有鎖的線程 setExclusiveOwnerThread(currentThread); return true; } else if (currentThread == getExclusiveOwnerThread()) { // 由於是獨佔鎖,因此同一時刻只能有一個線程能獲取到鎖,若是當前的鎖是被當前線程獲取過了,則將狀態變量+1 int newState = state + arg; // 設置新的狀態變量 setState(newState); return true; } } return false; } // 嘗試釋放獨佔鎖 @Override protected boolean tryRelease(int arg) { // 判斷當前鎖釋放是當前線程鎖獨佔的,若是判斷不成立則拋出異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); int newState = getState() - arg; boolean free = false; if (newState == 0) { // 若是狀態爲0了則說明當前線程能夠釋放對鎖的持有了 setExclusiveOwnerThread(null); free = true; } // AQS方法:當前線程已持有鎖,能夠直接修改state值,不須要經過CAS修改 setState(newState); return free; }
在 tryAcquire
方法主要加了判斷:若是 state
不爲0的時候,判斷當前線程是否已經和鎖綁定,已經綁定的話則將 state+1
同時返回true
在 tryRelease
方法中主要增長了釋放鎖的時候是對 state
變量逐次減一當減到0的時候纔將鎖與當前線程綁定的狀態去除,釋放鎖。
從新運行下已經不會阻塞了,若是不懂的地方看下注釋就明白了。
那麼當線程獲取不到鎖的時候是如何等待的呢?又是何時被喚醒的呢?接下來咱們一步步跟隨源碼看看到底作了什麼?
AQS
有獨佔
模式和共享
模式,首先來看看獨佔模式,看下lock
加鎖方法:
@Override public void lock() { sync.acquire(1); }
這裏能夠看到並非調用咱們重寫的 tryAcquire
方法,而是調用父類 AbstractQueuedSynchronizer
的方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
此方法是獨佔模式下線程獲取共享資源的頂層入口。若是獲取鎖成功,線程直接返回,不然進入等待隊列,直到獲取鎖爲止,且整個過程忽略中斷的影響。
首先是調用 tryAcquire
方法來獲取嘗試獲取鎖,跟過去看一下AQS
的實現:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
能夠看到該方法定義爲protect
,也就是由咱們子類去實現的,若是獲取鎖失敗的話那麼會進入等待隊列,來看看addWaiter
方法:
/* * 將當前線程添加到等待隊列的隊尾,並返回當前線程所在的節點 */ private Node addWaiter(Node mode) { // 以獨佔模式把當前線程封裝成一個Node節點 Node node = new Node(Thread.currentThread(), mode); // 嘗試將結點放到隊列尾部 Node pred = tail; if (pred != null) { node.prev = pred; // 使用CAS把node做爲尾節點 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 尾節點爲空或者利用CAS把node設爲尾節點失敗時經過enq方法進行入隊 enq(node); return node; } /* * 採用for循環自旋的方式把node插入到隊列中 */ private Node enq(final Node node) { for (;;) { Node t = tail; // 隊列爲空,須要初始化 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
enq
方法中採用了很是經典的自旋
操做,只有經過CAS
把node設爲尾節點後,當前線程才能退出該方法,不然的話,當前線程不斷的嘗試,直到能把節點添加到隊列中爲止。繼續看一下acquireQueued
方法:
/* * 經過自旋獲取鎖 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 默認線程沒有被中斷過 boolean interrupted = false; for (;;) { // 獲取該節點的前驅節點 final Node p = node.predecessor(); // 若是前驅節點是頭節點而且當前線程獲取到鎖 if (p == head && tryAcquire(arg)) { // 設置當前節點爲頭節點 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 掛起當前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 移除當前節點 cancelAcquire(node); } }
該方法比較複雜,咱們來仔細分析一下:
shouldParkAfterFailedAcquire
方法判斷是否須要掛起當前線程shouldParkAfterFailedAcquire
方法從名字也能看出來是當獲取鎖失敗後用來判斷是否須要掛起當前線程,實現功能簡單的講就是把當前node節點的有效前驅(有效是指waitStatus
不是CANCELLED的)找到,而且將有效前驅的狀態設置爲SIGNAL
,以後便返回true表明立刻能夠阻塞了。來看看實現代碼:
/* * 判斷是否須要掛起當前線程 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 前驅節點已經設置了SIGNAL,若是前驅變成了head,而且head的表明線程 * exclusiveOwnerThread釋放了鎖,就會來根據這個SIGNAL來喚醒本身 */ return true; if (ws > 0) { /* * 發現傳入的前驅的狀態大於0,即CANCELLED。說明前驅節點已經由於超時或響應了中斷,而取消了本身。 * 因此須要向前遍歷直到找到一個<=0的節點 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 若是是其餘狀況,那麼CAS嘗試設置前驅節點爲SIGNAL */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
在shouldParkAfterFailedAcquire
返回true的狀況下,繼續看parkAndCheckInterrupted
方法
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
調用LockSupport的 park
方法掛起當前線程,返回該線程是否被中斷過,若是被中斷過,直接設置 interrupted = true
。
(LockSupport類是Java6引入的一個類,提供了基本的線程同步原語,有興趣的小夥伴能夠去了解一下)
最後來看下 cancelAcquire
方法:
/** * 取消當前節點 */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; // 把當前節點的線程設爲 null node.thread = null; // 和前面同樣向前遍歷找到有效的前驅節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; // 把node節點的ws設爲CANCELLED node.waitStatus = Node.CANCELLED; // 若是node是尾節點,利用CAS把前驅節點設爲尾節點,後繼節點爲null方便GC if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // 前驅節點不是頭結點 && 線程不爲空 && waitStatus爲singal // 利用CAS把node的next設爲pred的next節點 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // node是頭結點,喚起它的後繼節點 unparkSuccessor(node); } node.next = node; // help GC } }
最後若是acquireQueued
方法返回true,須要進行自我中斷。由於parkAndCheckInterrupted
方法不響應中斷,而且內部調用了Thread.interrupted
方法清除中斷標記位。因此當該方法返回true(被中斷)時,須要手動補償中斷標記位。
static void selfInterrupt() { Thread.currentThread().interrupt(); }
流程總結
tryAcquire()
嘗試直接去獲取鎖,若是成功則直接返回;addWaiter()
將該線程加入等待隊列的尾部,並標記爲獨佔模式;acquireQueued()
使線程在隊列中等待直到獲取鎖。若是在整個等待過程當中被中斷過,則返回true,不然返回false。selfInterrupt()
,將中斷補上。調用同步器的acquire(int arg)
方法能夠獲取同步狀態,該方法對中斷不敏感,即線程獲取同步狀態失敗後進入同步隊列,後續對線程進行中斷操做時,線程不會從同步隊列中移除。獲取流程:
tryAcquire()
方法嘗試獲取鎖,成功則直接返回,失敗則進入隊列排隊等待,經過CAS獲取同步狀態。Node.EXCLUSIVE
),經過addWaiter(Node node,int args)
方法,將節點加入到同步隊列的隊列尾部。acquireQueued(final Node node, int args)
方法,使該節點以死循環的方式獲取同步狀態,若是獲取不到,則阻塞節點中的線程。acquireQueued
方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點的時候才能嘗試獲取鎖(同步狀態)( p == head && tryAcquire(arg)
)。上面看完了加鎖的流程,接下來看看是如何釋放鎖的?
public final boolean release(int arg) { // tryRelease由子類實現 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 喚醒後繼節點 unparkSuccessor(h); return true; } return false; }
代碼比較簡單,tryRelease
和上面同樣也是有事子類去實現,若是釋放鎖成功的話那麼咱們須要調用 unparkSuccessor
方法去喚醒後繼節點,看下具體實現:
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) // 將當前節點的狀態修改成0 compareAndSetWaitStatus(node, ws, 0); // 若是直接後繼爲空或者它的waitStatus大於0(已經放棄獲取鎖了),咱們就遍歷整個隊列,獲取第一個須要喚醒的節點 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒節點 LockSupport.unpark(s.thread); }
代碼比較簡單,這裏就不展開細說了。
上面講完了獨佔模式,如今來說下共享模式,所謂共享模式就是同一個時刻容許多個線程持有鎖,比方說 ReentrantReadWriteLock
就是實現了共享模式的AQS
,直接上代碼:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /* * 共享模式獲取鎖 */ private void doAcquireShared(int arg) { // 加入隊列尾部 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // head是已經拿到鎖的節點 if (p == head) { // 嘗試獲取鎖,返回的r是剩餘的資源數量,若是大於0那麼須要喚醒後續節點 int r = tryAcquireShared(arg); if (r >= 0) { // 將head指向本身,還有剩餘資源能夠再喚醒以後的線程 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
這個方法大體上看起來和獨佔模式是很相像的。區別只在於獨佔模式下,在本方法中獲取到資源後,只是將本節點設置爲head節點。而共享模式下,設置完head節點後,還須要判斷是否須要喚醒多個線程,看一下如何喚醒線程:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * 什麼狀況下要喚醒後繼結點? * 1.資源剩餘數大於0,有剩餘資源確定是要喚醒後繼結點的 * 2.頭結點不存在。 * 3.頭結點狀態小於0,意味着後繼節點要求node(也就是當前head)喚醒後繼結點 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
doReleaseShared
方法裏面纔是真正來喚醒線程:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 因爲該方法可能被併發調用,爲了不沒必要要的喚醒浪費,由於經過cas來搶佔喚醒權利。 // 搶佔成功者執行真正的後繼結點喚醒任務。若是失敗則進行重試 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } // 若是ws==0,多是head結點已經沒有後繼結點,也有多是由於head節點的後繼結點喚醒權被其餘線程剛搶佔成功。 // 若是沒有後繼結點,顯然不須要作任何事情 // 若是是喚醒權被其餘線程搶佔,則不須要作任何事情。由於喚醒是在隊列上進行傳播的。因此這裏就cas將head節點的狀態值修改成 PROPAGATE。用來表達該線程喚醒操做意圖已經傳達。可是會由別的線程真正的執行後續的喚醒動做。一樣,若是失敗了,則重試。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
tryAcquireShared
方法也是由子類去實現,可是AQS已經把其返回值的語義定義好了:負值表明獲取失敗;0表明獲取成功,但沒有剩餘資源;正數表示獲取成功,還有剩餘資源,其餘線程還能夠去獲取。因此這裏acquireShared()
的流程就是:
tryAcquireShared()
嘗試獲取資源,成功則直接返回;doAcquireShared()
進入等待隊列park()
,直到被unpark()/interrupt()
併成功獲取到資源才返回。整個等待過程也是忽略中斷的。doAcquireShared(int)
此方法用於將當前線程加入等待隊列尾部休息,直到其餘線程釋放鎖喚醒本身,本身成功拿到相應的資源後才返回。看完加鎖的方法,如今來看共享模式下的釋放鎖方法:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
能夠看到解鎖也是用的doReleaseShared
方法,代碼比較簡單這裏再也不展開細說。
今天有關AQS
的知識點就介紹到這裏,有什麼不對的地方請多多指教!