AQS是J.U.C包下AbstractQueuedSynchronizer抽象的隊列式的同步器的簡稱,這是一個抽象類,它定義了一套多線程訪問共享資源的同步器框架,J.U.C包下的許多同步類實現都依賴於它,好比ReentrantLock/Semaphore/CountDownLatch,能夠說這個抽象類是J.U.C併發包的基礎。java
之因此把這一章節叫作AQS簡介而不是叫AQS詳解,是由於已經有大神寫過詳解的文章Java併發之AQS詳解,這篇文章對AQS的源碼解析很透徹,博主讀了以後受益不淺,鑑於對原做者的尊重,因此如上附上原文的連接。要想弄懂AQS還得從這一圖提及。node
如上圖所述,AQS維護了一個state變量和一個FIFO先進先出隊列,這個state用來幹嗎的能夠參考我前一篇博客中的那個count計數器,就是用來計數線程的重入次數的。上一篇博客還用了一個變量currentThread來記錄已經得到這把鎖的線程。而咱們的AQS用的是一個先進先出的等待隊列的完成這件事。當新的線程進來的時候,AQS調用tryAquice()方法試圖去得到鎖,若是得到的話,則調用interupt中斷方法;若是沒有得到鎖,則把當前線程放入排隊的隊列,AQS隊列不斷的自旋嘗試去判斷已經佔用的線程是否已經放開,若是鎖依然被線程繼續佔用,則繼續添加進等待隊列。安全
源碼以下:多線程
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
那個addWaiter方法,此方法用於將當前線程加入到等待隊列的隊尾,並返回當前線程所在的結點。併發
private Node addWaiter(Node mode) { //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享) Node node = new Node(Thread.currentThread(), mode); //嘗試快速方式直接放到隊尾。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //上一步失敗則經過enq入隊。 enq(node); return node; }
咱們以獨佔式的同步幫助器爲例來看一下AQS的執行流程。框架
大體流程以下:ide
調用自定義同步器的tryAcquire()嘗試直接去獲取資源,若是成功則直接返回;學習
沒成功,則addWaiter()將該線程加入等待隊列的尾部,並標記爲獨佔模式;測試
acquireQueued()使線程在等待隊列中休息,有機會時(輪到本身,會被unpark())會去嘗試獲取資源。獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。ui
若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt(),將中斷補上。
上述的流程和步驟已是AQS幫咱們實現了的功能,估計我講的也不太清楚,這裏再次推薦讀者閱讀這篇文章Java併發之AQS詳解,下面咱們應該來看看如何使用AQS。
用AQS寫一把互斥鎖
互斥鎖是爲了保證數據的安全,在任一時刻只能有一個線程訪問該對象。由上一個小節咱們可知,AQS已經爲咱們實現全部排隊和阻塞機制,咱們只須要調用getState()、setState(int) 和 compareAndSetState(int, int) 方發來維護state變量的數值和調用setExclusiveOwnerThread/getExclusiveOwnerThread來維護當前佔用的線程是誰就好了。翻越JDK提供的API,它建議咱們:應該將子類定義爲非公共內部幫助器類,可用它們來實現其封閉類的同步屬性。類 AbstractQueuedSynchronizer 沒有實現任何同步接口。而是定義了諸如 acquireInterruptibly(int) 之類的一些方法,在適當的時候能夠經過具體的鎖和相關同步器來調用它們,以實現其公共方法。
什麼意思呢?意思就是建議咱們:若是你想要使用AQS實現一把互斥鎖Mutex,就必須先用一個類去繼承AbstractQueuedSynchronizer這個抽象類,然而這個實現的子類(暫取名叫Sync)應該是做爲Mutex的內部類來用的,提供給Mutex看成幫助器來使用。那麼Lock接口,Mutex互斥鎖,AbstractQueuedSynchronizer抽象類和Sync幫助器這四者存在什麼聯繫呢?爲了不你聽糊塗了,下面我整理他們的UML類圖以下。
由上圖可知:Mutex互斥鎖繼承了Lock鎖的接口,具備鎖的屬性,能夠提供上鎖和釋放鎖的方法,他是對外提供服務的服務者,而Mutex類有個Sync類型的私有對象sync,這個私有對象繼承了AbstractQueuedSynchronizer抽象類,是Mutex鎖和AQS的橋樑,是加鎖和釋放鎖真正的服務者。若是你看明白了上面的UML類圖,那麼咱們的Mutex互斥鎖的定義應該以下:
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class Mutex implements Lock { private Sync sync = new Sync(); private class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { // TODO Auto-generated method stub return super.tryAcquire(arg); } @Override protected boolean tryRelease(int arg) { // TODO Auto-generated method stub return super.tryRelease(arg); } } @Override public void lock() { // TODO Auto-generated method stub } @Override public void lockInterruptibly() throws InterruptedException { // TODO Auto-generated method stub } @Override public boolean tryLock() { // TODO Auto-generated method stub return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { // TODO Auto-generated method stub return false; } @Override public void unlock() { // TODO Auto-generated method stub } @Override public Condition newCondition() { // TODO Auto-generated method stub return null; } }
這裏咱們實現的是獨佔式的鎖,Sync幫助器只須要覆蓋父類的tryAcquire(),tryRelease()方法就好了,其餘方法能夠暫時刪掉,如共享式的tryAcquireShared(),tryReleaseShared(),已經Condition用到的isHeldExclusively()和toString()方法均可以暫時不用實現,由於咱們只是想先用AQS來作一把能夠保證數據安全的鎖,考慮的問題暫時沒有那麼多。
/** * 互斥鎖 * @author 張仕宗 * @date 2018.11.9 */ public class Mutex implements Lock{ //AQS子類的對象,Mutex互斥鎖用它來工做 private Sync sync = new Sync(); //Sync同步器類做爲公共內部幫助器,可用它來實現其封閉類的同步屬性 private class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { assert arg == 1; //這裏用到了斷言,互斥鎖,鎖只能被獲取一次,若是arg不等於1,則直接中斷 if(this.compareAndSetState(0, 1)) { //這裏作一下判斷,若是state的值爲等於0,立馬將state設置爲1 //返回true,告訴acqure方法,獲取鎖成功 return true; } return false; } @Override protected boolean tryRelease(int arg) { //釋放鎖,因爲這是一把互斥鎖,state不是0就是1,因此你須要作兩步: //1.直接將state置爲0 this.setState(0); //返回true,告訴aqs的release方法釋放鎖成功 return true; } } /** * 上鎖的方法 */ @Override public void lock() { sync.acquire(1); } /** * 釋放鎖的方法 */ @Override public void unlock() { sync.release(1); } @Override public void lockInterruptibly() throws InterruptedException { // TODO Auto-generated method stub } @Override public boolean tryLock() { // TODO Auto-generated method stub return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { // TODO Auto-generated method stub return false; } @Override public Condition newCondition() { // TODO Auto-generated method stub return null; } }
上訴代碼實現了一把最簡單的鎖,咱們只實現其lock()和unlock()方法,其餘方法請暫時忽略,而lock()方法和unlock()方法是如何實現的呢?lock()方法調用了Sync幫助器對象的sync.acquire(1)方法,因爲咱們的幫助器Sync並無實現這個方法,因此實際調用的是AQS的acquire()方法,而AQS這時候作了什麼時呢?再來一次該方法的源碼:
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
次方法乾的第一件事就是去調用tryAcquire()方法,這個方法須要Sync來實現,若是本身的Sync沒有實現這個方法的話,父類會直接拋出UnsupportedOperationException這個異常。
@Override protected boolean tryAcquire(int arg) { assert arg == 1; //這裏用到了斷言,互斥鎖,鎖只能被獲取一次,若是arg不等於1,則直接中斷 if(this.compareAndSetState(0, 1)) { //這裏作一下判斷,若是state的值爲等於0,立馬將state設置爲1 //返回true,告訴acqure方法,獲取鎖成功 return true; } return false; }
因爲這是一把互斥鎖,因此只能有同一時刻只能得到一次鎖。代碼中用到了assert斷言,若是預得到鎖的次數不是1,則中斷。接下來if中判斷state狀態是否爲0,若是state狀態爲0,則說明鎖尚未被佔用,那麼我馬上佔用這把鎖,判斷state當前值和設置state爲1這兩步用原子性操做的代碼語句是this.compareAndSetState(0, 1),並立馬放回true,這時候AQS得到返回值,得到鎖成功。若是是第二個線程進來,if語句判斷獲得的值非0,則直接返回false,這時候AQS將新進來的線程放進FIFO隊列排隊。
接下來看看Mutex的unlock()方法,該方法調用了sync.release(1),看看AQS這時候作了什麼!
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
此方法是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。一樣的,咱們的同步器Sync須要去實現這個tryRelease方法,否則一樣會拋出UnsupportedOperationException異常。Sync的tryRelease方法比較簡單:
@Override protected boolean tryRelease(int arg) { //釋放鎖,因爲這是一把互斥鎖,state不是0就是1,因此你須要作兩步: //1.直接將state置爲0 this.setState(0); //返回true,告訴aqs的release方法釋放鎖成功 return true; }
只須要設置state爲0便可,因爲這是一把互斥鎖,state不是0就是1因此直接調用this.setSate(0)。
上訴的Mutex並不是一把可重入鎖,爲了實現這把鎖可以讓同一線程屢次進來,回憶一下上一篇博客中怎麼實現的?當時的作法是在鎖的lock()自旋方法中判斷新進來的是否是正在運行的線程,若是新進來的線程就是正在運行的線程,則獲取鎖成功,並讓計數器+1。而在釋放鎖的時候,若是釋放鎖的線程等於當前線程,讓計數器-1,只有當計數器count歸零的時候才真正的釋放鎖。一樣的,用AQS實現的鎖也是這個思路,那麼咱們的tryAcquice方法以下:
@Override protected boolean tryAcquire(int arg) { //若是第一個線程進來,直接得到鎖,並設置當前獨佔的線程爲當前線程 int state = this.getState(); if(state == 0) { //state爲0,說明當前沒有線程佔用該線程 if(this.compareAndSetState(0, arg)) { //判斷當前state值,第一個線程進來,馬上設置state爲arg this.setExclusiveOwnerThread(Thread.currentThread()); //設置當前獨佔線程爲當前線程 return true; //告訴頂級aqs獲取鎖成功 } } else { //若是是第二個線程進來 Thread currentThread = Thread.currentThread();//當前進來的線程 Thread ownerThread = this.getExclusiveOwnerThread();//已經保存進去的獨佔式線程 if(currentThread == ownerThread) { //判斷一下進來的線程和保存進去的線程是同一線程麼?若是是,則獲取鎖成功,若是不是則獲取鎖失敗 this.setState(state+arg); //設置state狀態 return true; } } return false; }
tryAcquice()方法代碼含義如註釋所示,與Mutex互斥鎖不一樣的是當state狀態不爲0時咱們的邏輯處理,若是第二次進來的線程currentThread和正在獨佔的線程ownerThread爲統一線程,第一步設置state增長1,第二步返回true給AQS。
tryRelease()方法代碼以下:
@Override protected boolean tryRelease(int arg) { //鎖的獲取和鎖的釋放是一一對應的,獲取過多少次鎖就釋放多少次鎖 if(Thread.currentThread() != this.getExclusiveOwnerThread()) { //若是釋放鎖的不是當前線程,則拋出異常 throw new RuntimeException(); } int state = this.getState()-arg; //接下來判斷state是否已經歸零,只有state歸零的時候才真正的釋放鎖 if(state == 0) { //state已經歸零,作掃尾工做 this.setState(0); this.setExclusiveOwnerThread(null); return true; }以上大綱全部的內容羣內已經將知識體系整理好(源碼,筆記,PPT,學習視頻)進羣免費領取。 加QQ羣:897889510,免費領取資料 this.setState(state); return false; }
tryRelease()首先是獲取當前state的值,並對這個值進行欲判:若是當前值state減去sync.release()傳來的參數歸零,則真正的釋放鎖,那麼咱們要作的第一步是設置state爲0,接着設置當前獨佔的線程爲null,再而後返回true告訴AQS釋放鎖成功。若是若是當前值state減去sync.release()傳來的參數歸零,若是讓state的值爲state-arg相減以後的值。
目前爲此,咱們以來了AQS框架來改寫的重入鎖代碼以下:
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * 用AQS實現的重入鎖 * @author 張仕宗 * @date 2018.11.9 */ public class MyAqsLock implements Lock{ //AQS子類的對象,用它來輔助MyAqsLock工做 private Sync sync = new Sync(); private class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { //若是第一個線程進來,直接得到鎖,並設置當前獨佔的線程爲當前線程 int state = this.getState(); if(state == 0) { //state爲0,說明當前沒有線程佔用該線程 if(this.compareAndSetState(0, arg)) { //判斷當前state值,第一個線程進來,馬上設置state爲arg this.setExclusiveOwnerThread(Thread.currentThread()); //設置當前獨佔線程爲當前線程 return true; //告訴頂級aqs獲取鎖成功 } } else { //若是是第二個線程進來 Thread currentThread = Thread.currentThread();//當前進來的線程 Thread ownerThread = this.getExclusiveOwnerThread();//已經保存進去的獨佔式線程 if(currentThread == ownerThread) { //判斷一下進來的線程和保存進去的線程是同一線程麼?若是是,則獲取鎖成功,若是不是則獲取鎖失敗 this.setState(state+arg); //設置state狀態 return true; } } return false; } @Override protected boolean tryRelease(int arg) { //鎖的獲取和鎖的釋放是一一對應的,獲取過多少次鎖就釋放多少次鎖 if(Thread.currentThread() != this.getExclusiveOwnerThread()) { //若是釋放鎖的不是當前線程,則拋出異常 throw new RuntimeException(); } int state = this.getState()-arg; //接下來判斷state是否已經歸零,只有state歸零的時候才真正的釋放鎖 if(state == 0) { //state已經歸零,作掃尾工做 this.setState(0); this.setExclusiveOwnerThread(null); return true; } this.setState(state); return false; } public Condition newCondition() { return new ConditionObject(); } } /** * 上鎖的方法 */ @Override public void lock() { sync.acquire(1); } /** * 釋放鎖的方法 */ @Override public void unlock() { sync.release(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { //調用幫助器的tryAcquire方法,測試獲取鎖一次,不會自旋 return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { //調用幫助器的tryRelease方法,測試釋放鎖一次,不會子旋 return sync.tryRelease(1); } @Override public Condition newCondition() { //調用幫助類獲取Condition對象 return sync.newCondition(); } }以上大綱全部的內容羣內已經將知識體系整理好(源碼,筆記,PPT,學習視頻)進羣免費領取。 加QQ羣:897889510,免費領取資料