點讚的靚仔,你時人羣中最閃耀的光芒
AQS,英文全稱AbstractQueuedSynchronizer,直接翻譯爲抽象的隊列同步器。是JDK1.5出現的一個用於解決併發問題的工具類,由大名鼎鼎的Doug Lea打造,與synchornized關鍵字不一樣的是,AQS是經過代碼解決併發問題。html
併發問題是指在多線程運行環境下,共享資源安全的問題。
如今的銀行帳戶,經過銀行卡和手機銀行均可以操做帳戶, 若是咱們同時拿着銀行卡和存摺去銀行搞事情,會怎麼樣呢?java
package demo.pattren.aqs; public class Money { /** * 假設如今帳戶有1000塊錢 */ private int money = 1000; /** * 取錢 */ public void drawMoney(){ this.money--; } public static void main(String[] args) throws InterruptedException { Money money = new Money(); for(int i=0; i<1000; i++){ new Thread(() -> { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } money.drawMoney(); },i + "").start(); } Thread.sleep(2000); System.out.println("當前帳戶餘額:" + money.money); } }
這樣想着是否是立刻能夠去銀行搞一波事情? 哈哈,你想太多了,若是能這樣搞,銀行早破產了。咱們主要是來分析一下出現這個問題的緣由,JVM內存是JMM結構的,每一個線程操做的數據是從主內存中複製的一個和備份,而多個線程就會存在多個備份,當線程中的備份數據被修改時,會將值刷新到主內存,好比多個線程同時獲取到了帳戶的餘額爲500元,A線程存錢100,線程A將600刷新到主內存,$\color{red}{主內存並不會主動通知其餘線程此時值已經被修改}$,因此主內存的值此時與其餘線程的值是不一樣的,若是其餘線程再操做帳戶餘額,是在500的基礎上進行的,這顯然不是咱們想要的結果。node
JDK提供了多種解決多線程安全的方式。安全
volatile是JDK提供的關鍵字,用來修飾變量,volatile修飾的變量可以保證多個線程下的可見性,如上個案例,A修改了帳戶的餘額,而後將最新的值刷新到主內存,此時主內存會將最新的值同步到其餘線程。
volatile解決了多線程下數據讀取一致的問題,$\color{red}{即保證可見性,可是其並不能保證寫操做的原子性}$,
當多個線程同時寫操做的時候,即多個線程同時去將線程中最新的值刷新到主內存,將會出現問題。
經過volatile關鍵字修飾money變量,發下並不能解決線程安全問題。多線程
原子操做類是JDK提供的一系列保證原子操做的工具類,原子類能夠保證多線程環境下對其值的操做是安全的。併發
package demo.pattren.aqs; import java.util.concurrent.atomic.AtomicInteger; public class AtomicMoney { /** * 假設如今帳戶有1000塊錢 */ private AtomicInteger money = new AtomicInteger(1000); /** * 取錢 */ public void drawMoney(){ //AtomicInteger的自減操做 this.money.getAndDecrement(); } public static void main(String[] args) throws InterruptedException { AtomicMoney money = new AtomicMoney(); for(int i=0; i<1000; i++){ new Thread(() -> { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } money.drawMoney(); },i + "").start(); } Thread.sleep(2000); System.out.println("當前帳戶餘額:" + money.money); } }
屢次測試結果都是0,與預期一致。原子操做類是使用CAS(Compare and swap 比較並替換)的機制來保證操做的原子性,相對於鎖,他的併發性更高。jvm
synchronized關鍵字是jvm層面來保證線程安全的,經過在代碼塊先後添加monitorenter與monitorexit命令來保證線程的安全,並且在JDK1.6對synchronized關鍵字作了較大的優化,性能有了較大的提高。能夠肯定的是,經過synchronized確定能夠保證線程安全,因此使用synchronized也是很好的選擇,固然synchronized鎖的升級不可逆特徵,致使在高併發下性能是不能很好的保證。高併發
終於迎來了本篇文章的主角,前面的內容,其實與文章的主題AQS並無直接的關聯,就簡單帶過。前面不少都是JVM層面來保證線程安全的,而AQS則是徹底經過代碼層面來處理線程安全的。
(PS:小節標題明明是Lock鎖,怎麼寫AQS了,騙我讀書少)工具
博主怕捱打,正在全力解釋中~。先上類圖壓場!
如上圖,左邊是抽象隊列同步器,而右邊則是使用隊列同步器實現的功能——鎖、信號量、發令槍等。
能夠先不看源碼,我們本身思考,要以純代碼的方式實現應當考慮哪些問題?性能
對於一、2兩點,難度應帶不大,而三、4兩點如何去設計呢?咱們經過僞代碼預演操做流程。
在業務端,是這樣操做的。
加鎖 {須要被鎖住的代碼} 釋放鎖
加鎖與釋放鎖的邏輯
if(state == 0) 獲取到鎖 set(state == 1) else 繼續等待 while(true){ if(state == 0) 再次嘗試獲取鎖 }
這樣設計以後,整個操做流程再次變成了串行操做。
這和咱們去食堂排隊打飯是同樣的,食堂不可能爲每一個學生都開放一個窗口,因此多個學生就會爭搶有限的窗口,若是沒有必定的控制,那麼食堂每到吃飯的時候都是亂套的,一羣學生圍着窗口同時去打飯,想一想都是多麼的恐怖。而由此出現了排隊的機制,一個窗口同一時間打飯的人只能有一個,當前一我的離開窗口後,後面排隊的學生才能去打飯。
下面咱們深刻JDK源碼,領略大師級的代碼設計。
業務調用代碼:
package demo.aqs; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class LockMoney { Lock lock = new ReentrantLock(); /** * 假設如今帳戶有1000塊錢 */ private int money = 1000; //private int money = 1000; /** * 取錢 */ public void drawMoney(){ lock.lock(); this.money--; lock.unlock(); } public static void main(String[] args) throws InterruptedException { LockMoney money = new LockMoney(); for(int i=0; i<1000; i++){ new Thread(() -> { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } money.drawMoney(); },i + "").start(); } Thread.sleep(2000); System.out.println("當前帳戶餘額:" + money.money); } }
追蹤Lock方法:
直接看源碼基本一下子就暈車,我嘗試繪製出lock方法的調用鏈路。而後結合源碼解釋。
你們跟着箭頭走一遍源碼,多多少少可以體會到AQS的實現機制。
final void lock() { //CAS嘗試將state從0更新爲1,更新成功則執行if下面的代碼。 if (compareAndSetState(0, 1)) //獲取鎖成功,執行線程執行 setExclusiveOwnerThread(Thread.currentThread()); else //獲取鎖失敗,線程入隊列 acquire(1); }
看到這段代碼,是否是瞬間明白前面提到的一、2兩點問題。首先compareAndSetState方法是使用Unsafe直接操做內存而且使用樂觀鎖的方式,可以保證有且僅有一個線程可以操做成功,是多線程安全的。即設置將state設置爲1成功的線程可以搶佔到鎖(線程互斥),而沒有設置成功的線程將進行入隊操做(排隊等候),這樣感受瞬間明朗了許多,那咱們接着往下看。
public final void acquire(int arg) { //tryAcquire失敗而且acquireQueued成功,則調用selfInterrupt if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //當線程獲取鎖失敗而且線程阻塞失敗會中斷線程 selfInterrupt(); }
AbstractQueuedSynchronizor的tryAcquire方法,其最終調用到了Sync的nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //獲取當前鎖的狀態值 int c = getState(); // state = 0,表示當前鎖爲空閒狀態,其實這一段代碼和前面lock的方法是同樣的 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //不等於0 則判斷當前線程是否爲持有鎖的線程,若是是則執行代碼,這裏解決了重入鎖問題 else if (current == getExclusiveOwnerThread()) { //當前狀態值 + 1(能夠看前面的傳參) int nextc = c + acquires; // 囧, 這裏是超出了int的最大值纔會出現這樣的狀況 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //更新state的值 setState(nextc); return true; } return false; }
經過閱讀源碼,能夠發現,tryAcquire方法在當前線程獲取鎖成功或者是重入鎖的狀況下返回true,不然返回false。而同時這個方法解決了上面提到的第4點鎖重入的問題。ok,感受愈來愈接近真相了,接着看addWaiter方法。
理解addWaiter方法的代碼,先看方法中用的得Node對象。 Node對象是對Thread對象的封裝,使其具備線程的功能,同時他還有prev、next等屬性。那麼很明瞭,Node是一個鏈表結構的對象
//前一個結點 volatile Node prev; //下一個結點 volatile Node next;
同時AbstractQueuedSynchronizor中包含head、tail屬性
//Node鏈表的頭結點 private transient volatile Node head; //Node鏈表的尾結點 private transient volatile Node tail;
private Node addWaiter(Node mode) { //將當前線程包裝爲Node對象 Node node = new Node(Thread.currentThread(), mode); //獲取尾節點,當這段代碼第一次運行的時候,並無尾結點 //因此確定值爲null,那麼會執行下面的enq方法 Node pred = tail; //當再次運行代碼的時候,尾結點再也不爲null(enq方法初始化了尾結點,能夠先往下看enq方法源碼) if (pred != null) { //當前結點的前置結點指向以前的尾結點 node.prev = pred; //CAS嘗試將尾結點從pred設置爲node if (compareAndSetTail(pred, node)) { //設置成功則將pred的next結點執行node pred.next = node; return node; } } enq(node); return node; }
上面的解釋聽着有點繞腦殼。
不着急,咱們先看enq方法
private Node enq(final Node node) { //死循環 for (;;) { //獲取尾結點 Node t = tail; //尾結點爲空,則初始化尾結點和頭結點爲同一個新建立的Node對象 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //將當前結點設爲爲尾結點,並將前一個尾結點的next指向當前結點 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; //退出循環 return t; } } } }
enq具體作了什麼事情呢:
這裏須要一些時間 + 空間的想象力,但若是對鏈表結構比較熟悉的話,這裏理解也是不太困難的。
咱們動態的想想執行過程:
final boolean acquireQueued(final Node node, int arg) { //局部變量 boolean failed = true; try { //局部變量 boolean interrupted = false; //死循環 for (;;) { //獲取前置結點 final Node p = node.predecessor(); //前置結點爲head而且嘗試獲取鎖成功,則不阻塞 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //阻塞操做 , 判斷是否應該阻塞 而且 阻塞是否成功 if ( //是否在搶佔鎖失敗後阻塞 shouldParkAfterFailedAcquire(p, node) && //Unsafe操做使線程阻塞 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
//Node pred 前置結點, Node node 當前結點 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //獲取前置結點的等待狀態 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 喚醒信號,即前結點正常,就設置waitStatus爲SIGNAL,表示前置結點能夠喚醒當前結點,那 * 麼當前結點纔會安心的被阻塞(若是前置結點不正常,可能就會致使本身不能被喚醒,那確定不 * 能安心睡覺的) */ return true; if (ws > 0) { /* * 找到前置結點中waitStatus <= 0 的Node結點並設置爲當前結點的前置結點 * 此狀態表示結點不是處於正常狀態,那麼將他從鏈表中刪除,直到找到狀態正常的結點 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 當waitStatus = 0 或者 PROPAGATE(-3) 時,CAS設置值爲SIGNAL(-1) * 此狀態表示線程正常,但沒有設置喚醒,通常爲tail的前一個結點,那麼須要將其設置爲可喚醒 * 狀態(SIGNAL) */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
圖解以下。
至此,咱們瞭解了AQS對須要等待的線程存儲的過程。
而AQS的解鎖以及公平鎖、非公平鎖,共享鎖、獨享鎖等後續跟上。
參考資料:
https://www.cnblogs.com/water...
https://www.jianshu.com/p/d61...