好了,咱們來開始今天的內容,首先咱們來看下AQS
是什麼,全稱是 AbstractQueuedSynchronizer
翻譯過來就是【抽象隊列同步】對吧。經過名字咱們也能看出這是個抽象類
並且裏面定義了不少的方法
裏面這麼多方法,我們固然不是一個個去翻。裏面還有不少的抽象方法,我們還得找它的實現多麻煩對不對。因此咱們換個方式來探索。java
咱們先來看下這樣一個場景
在這裏咱們有一個能被多個線程共享操做的資源,在這個場景中應該能看出咱們的數據是不安全的,由於咱們並不能保證咱們的操做是原子操做對吧。基於這個場景咱們經過代碼來看看效果node
package com.example.demo; public class AtomicDemo { // 共享變量 private static int count = 0; // 操做共享變量的方法 public static void incr(){ // 爲了演示效果 休眠一會兒 try { Thread.sleep(1); count ++; } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 1000 ; i++) { new Thread(()->AtomicDemo.incr()).start(); } Thread.sleep(4000); System.out.println("result:" + count); } }
經過執行發現,執行的結果是一個不肯定的值,但老是會小於等於1000,至於緣由,是由於incr() 方法不是一個原子操做。爲何不是原子操做這個我們今天就不深究此處了.
迎合今天的主題,咱們經過Lock來解決安全
package com.example.demo; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class AtomicDemo { // 共享變量 private static int count = 0; private static Lock lock = new ReentrantLock(); // 操做共享變量的方法 public static void incr(){ // 爲了演示效果 休眠一會兒 try { lock.lock(); Thread.sleep(1); count ++; } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 1000 ; i++) { new Thread(()->AtomicDemo.incr()).start(); } Thread.sleep(4000); System.out.println("result:" + count); } }
而後咱們運行發現結果都是 1000了,這也就是1000個線程都去操做這個 count 變量,結果符合咱們的預期了。那lock究竟是怎麼實現的呢?函數
咱們先來分析分析
這樣的圖片看着比較複雜,我們簡化下。
咱們本身假設下,若是要你去設計這樣的方法,你應該要怎麼設計,他們須要實現哪些功能,
首先是lock方法,它是否是要知足這幾個功能。
需求清楚了,那咱們怎麼設計呢?
第一個互斥怎麼作,也就是多個線程只有一個線程能搶佔到資源,這個時候咱們能夠這樣設置源碼分析
// 給一個共享資源 Int state = 0 ; // 0表示資源沒有被佔用,能夠搶佔 if(state == 0 ){ // 表示能夠獲取鎖 }else{ // 表示鎖被搶佔 須要阻塞等待 }
而後就是沒有搶佔到鎖的線程的存儲,咱們能夠經過一個隊列,利用FIFO來實現存儲。
最後就是線程的阻塞和喚醒。你們說說有哪些阻塞線程的方式呀?ui
1.wait/notify: 不合適,不能喚醒指定的線程
2.Sleep:休眠,相似於定時器
3.Condition:能夠喚醒特定線程
4.LockSupport:
LockSupport.park():阻塞當前線程
LockSupport.unpark(Thread t):喚醒特定線程
結合今天的主題,咱們選擇LockSupport來實現阻塞和喚醒。
好了,到這兒咱們已經猜測到了Lock中的實現邏輯,可是在探究源碼以前咱們還有個概念須要先和你們講下,由於這個是咱們源碼中會接觸到的一個,先講了,看的時候就比較輕鬆了對吧。this
咱們先來看看重入鎖的場景代碼spa
package com.example.demo; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class AtomicDemo { // 共享變量 private static int count = 0; private static Lock lock = new ReentrantLock(); // 操做共享變量的方法 public static void incr(){ // 爲了演示效果 休眠一會兒 try { lock.lock(); Thread.sleep(1); count ++; // 調用了另一個方法。 decr(); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } public static void decr(){ try { // 重入鎖 lock.lock(); count--; }catch(Exception e){ }finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 1000 ; i++) { new Thread(()->AtomicDemo.incr()).start(); } Thread.sleep(4000); System.out.println("result:" + count); } }
首先你們考慮這段代碼會死鎖嗎? 你們給我個回覆,我看看你們的理解的怎麼樣
好了,有說會死鎖的,有說不會,其實這兒是不會死鎖的,並且結果就是0.爲何呢?
這個實際上是鎖的一個嵌套,由於這兩把鎖都是同一個 線程對象,咱們講共享變量的設計是
當state=0;線程能夠搶佔到資源 state =1; 若是進去嵌套訪問 共享資源,這時 state = 2 若是有多個嵌套 state會一直累加,釋放資源的時候, state–,直到全部重入的鎖都釋放掉 state=0,那麼其餘線程才能繼續搶佔資源,說白了重入鎖的設計目的就是爲了防止 死鎖!線程
經過類圖咱們能夠發現右車的業務應用其實內在都有相識的設計,這裏咱們只須要搞清楚其中的一個,其餘的你本身應該就能夠看懂~,好了咱們就具體結合前面的案例代碼,以ReentrantLock爲例來介紹AQS的代碼實現。翻譯
在看源碼以前先回顧下這個圖,帶着問題去看,會更輕鬆
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
這個方法邏輯比較簡單,if條件成立說明 搶佔鎖成功並設置 當前線程爲獨佔鎖
else 表示搶佔失敗,acquire(1) 方法咱們後面具體介紹
compareAndSetState(0, 1):用到了CAS 是一個原子操做方法,底層是UnSafe.做用就是設置 共享操做的 state 由0到1. 若是state的值是0就修改成1
setExclusiveOwnerThread:代碼很簡單,進去看一眼便可
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
1.tryAcquire()嘗試直接去獲取資源,若是成功則直接返回(這裏體現了非公平鎖,每一個線程獲取鎖時會嘗試直接搶佔加塞一次,而CLH隊列中可能還有別的線程在等待);
2.addWaiter()將該線程加入等待隊列的尾部,並標記爲獨佔模式;
3.acquireQueued()使線程阻塞在等待隊列中獲取資源,一直獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt(),將中斷補上。
固然這裏代碼的做用我是提早研究過的,對於你們確定不是很清楚,咱們繼續裏面去看,最後你們能夠回到這兒再論證。
再次嘗試搶佔鎖
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //再次嘗試搶佔鎖 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 重入鎖的狀況 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // false 表示搶佔失敗 return false; }
將阻塞的線程添加到雙向鏈表的結尾
private Node addWaiter(Node mode) { //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享) Node node = new Node(Thread.currentThread(), mode); //嘗試快速方式直接放到隊尾。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //上一步失敗則經過enq入隊。 enq(node); return node; }
private Node enq(final Node node) { //CAS"自旋",直到成功加入隊尾 for (;;) { Node t = tail; if (t == null) { // 隊列爲空,建立一個空的標誌結點做爲head結點,並將tail也指向它。 if (compareAndSetHead(new Node())) tail = head; } else {//正常流程,放入隊尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
第一個if語句
else語句
線程3進來會執行以下代碼
那麼效果圖
acquireQueued(Node, int)
OK,經過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部了。聰明的你馬上應該能想到該線程下一部該幹什麼了吧:進入等待狀態休息,直到其餘線程完全釋放資源後喚醒本身,本身再拿到資源,而後就能夠去幹本身想幹的事了。沒錯,就是這樣!是否是跟醫院排隊拿號有點類似~~acquireQueued()就是幹這件事:在等待隊列中排隊拿號(中間沒其它事幹能夠休息),直到拿到號後再返回。這個函數很是關鍵,仍是上源碼吧:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true;//標記是否成功拿到資源 try { boolean interrupted = false;//標記等待過程當中是否被中斷過 //又是一個「自旋」! for (;;) { final Node p = node.predecessor();//拿到前驅 //若是前驅是head,即該結點已成老二,那麼便有資格去嘗試獲取資源(多是老大釋放完資源喚醒本身的,固然也可能被interrupt了)。 if (p == head && tryAcquire(arg)) { setHead(node);//拿到資源後,將head指向該結點。因此head所指的標杆結點,就是當前獲取到資源的那個結點或null。 p.next = null; // setHead中node.prev已置爲null,此處再將head.next置爲null,就是爲了方便GC回收之前的head結點。也就意味着以前拿完資源的結點出隊了! failed = false; // 成功獲取資源 return interrupted;//返回等待過程當中是否被中斷過 } //若是本身能夠休息了,就經過park()進入waiting狀態,直到被unpark()。若是不可中斷的狀況下被中斷了,那麼會從park()中醒過來,發現拿不到資源,從而繼續進入park()等待。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;//若是等待過程當中被中斷過,哪怕只有那麼一次,就將interrupted標記爲true } } finally { if (failed) // 若是等待過程當中沒有成功獲取資源(如timeout,或者可中斷的狀況下被中斷了),那麼取消結點在隊列中的等待。 cancelAcquire(node); } }
到這裏了,咱們先不急着總結acquireQueued()的函數流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體幹些什麼。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//拿到前驅的狀態 if (ws == Node.SIGNAL) //若是已經告訴前驅拿完號後通知本身一下,那就能夠安心休息了 return true; if (ws > 0) { /* * 若是前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後邊。 * 注意:那些放棄的結點,因爲被本身「加塞」到它們前邊,它們至關於造成一個無引用鏈,稍後就會被保安大叔趕走了(GC回收)! */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //若是前驅正常,那就把前驅的狀態設置成SIGNAL,告訴它拿完號後通知本身一下。有可能失敗,人家說不定剛剛釋放完呢! compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
整個流程中,若是前驅結點的狀態不是SIGNAL,那麼本身就不能安心去休息,須要去找個安心的休息點,同時能夠再嘗試下看有沒有機會輪到本身拿號。
若是線程找好安全休息點後,那就能夠安心去休息了。此方法就是讓線程去休息,真正進入等待狀態。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//調用park()使線程進入waiting狀態 return Thread.interrupted();//若是被喚醒,查看本身是否是被中斷的。 }
好了,咱們能夠小結下了。
看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),如今讓咱們再回到acquireQueued(),總結下該函數的具體流程:
1.結點進入隊尾後,檢查狀態,找到安全休息點;
2.調用park()進入waiting狀態,等待unpark()或interrupt()喚醒本身;
3.被喚醒後,看本身是否是有資格能拿到號。若是拿到,head指向當前結點,並返回從入隊到拿到號的整個過程當中是否被中斷過;若是沒拿到,繼續流程1。
最後咱們再回到前面的acquire方法來總結下
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
總結下它的流程吧
1.調用自定義同步器的tryAcquire()嘗試直接去獲取資源,若是成功則直接返回;
2.沒成功,則addWaiter()將該線程加入等待隊列的尾部,並標記爲獨佔模式;
3.acquireQueued()使線程在等待隊列中休息,有機會時(輪到本身,會被unpark())會去嘗試獲取資源。獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。
4.若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt(),將中斷補上。
好了,lock方法看完後,咱們再來看下unlock方法
它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。這也正是unlock()的語義,固然不只僅只限於unlock()
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head;//找到頭結點 if (h != null && h.waitStatus != 0) unparkSuccessor(h);//喚醒等待隊列裏的下一個線程 return true; } return false; }
此方法嘗試去釋放指定量的資源。下面是tryRelease()的源碼:
public final boolean release(int arg) { if (tryRelease(arg)) {//這裏是先嚐試釋放一下資源,通常均可以釋放成功,除了屢次重入但只釋放一次的狀況。 Node h = head; //這裏判斷的是 阻塞隊列是否還存在和head節點是不是tail節點,由於以前說過,隊列的尾節點的waitStatus是爲0的 if (h != null && h.waitStatus != 0) //到這裏就說明head節點已經釋放成功啦,就先去叫醒後面的直接節點去搶資源吧 unparkSuccessor(h); return true; } return false; }
private void unparkSuccessor(Node node) { //這裏,node通常爲當前線程所在的結點。 int ws = node.waitStatus; if (ws < 0)//置零當前線程所在的結點狀態,容許失敗。 compareAndSetWaitStatus(node, ws, 0); Node s = node.next;//找到下一個須要喚醒的結點s if (s == null || s.waitStatus > 0) {//若是爲空或已取消 s = null; for (Node t = tail; t != null && t != node; t = t.prev) // 從後向前找。 if (t.waitStatus <= 0)//從這裏能夠看出,<=0的結點,都是還有效的結點。 s = t; } if (s != null) LockSupport.unpark(s.thread);//喚醒 }
這個函數並不複雜。一句話歸納:用unpark()喚醒等待隊列中最前邊的那個未放棄線程,這裏咱們也用s來表示吧。此時,再和acquireQueued()聯繫起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即便p!=head也不要緊,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裏既然s已是等待隊列中最前邊的那個未放棄線程了,那麼經過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),而後s把本身設置成head標杆結點,表示本身已經獲取到資源了,acquire()也返回了
好了,到這咱們就由於把源碼看完了,再回頭來看下這張圖
是否是就清楚了AQS究竟是怎麼實現的咱們上面的猜測的了吧。那麼對應的下課後讓你本身去看
這幾個的源碼,你是否是就應該能看懂了,好了本文就介紹到此,本文對你有幫助的歡迎關注點贊,謝謝