html
AQS全稱AbstractQueuedSynchronizer
,即抽象的隊列同步器,是一種用來構建鎖和同步器的框架。node
基於AQS構建同步器:api
ReentrantLock安全
Semaphore併發
CountDownLatch框架
ReentrantReadWriteLockui
SynchronusQueuespa
FutureTask線程
優點:設計
AQS 解決了在實現同步器時涉及的大量細節問題,例如自定義標準同步狀態、FIFO 同步隊列。
基於 AQS 來構建同步器能夠帶來不少好處。它不只可以極大地減小實現工做,並且也沒必要處理在多個位置上發生的競爭問題。
若是被請求的共享資源空閒,則將當前請求資源的線程設置爲有效的工做線程,而且將共享資源設置爲鎖定狀態。若是被請求的共享資源被佔用,那麼就須要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。如圖所示:
Sync queue: 同步隊列,是一個雙向列表。包括head節點和tail節點。head節點主要用做後續的調度。
Condition queue: 非必須,單向列表。當程序中存在cindition的時候纔會存在此列表。
AQS使用一個int成員變量來表示同步狀態
使用Node實現FIFO隊列,能夠用於構建鎖或者其餘同步裝置
AQS資源共享方式:獨佔Exclusive(排它鎖模式)和共享Share(共享鎖模式)
AQS它的全部子類中,要麼實現並使用了它的獨佔功能的api,要麼使用了共享鎖的功能,而不會同時使用兩套api,即使是最有名的子類ReentrantReadWriteLock也是經過兩個內部類讀鎖和寫鎖分別實現了兩套api來實現的
state狀態使用volatile int類型的變量,表示當前同步狀態。state的訪問方式有三種:
getState()
setState()
compareAndSetState()
CANCELLED waitStatus值爲1時表示該線程節點已釋放(超時、中斷),已取消的節點不會再阻塞。
SIGNAL waitStatus爲-1時表示該線程的後續線程須要阻塞,即只要前置節點釋放鎖,就會通知標識爲 SIGNAL 狀態的後續節點的線程
CONDITION waitStatus爲-2時,表示該線程在condition隊列中阻塞(Condition有使用)
PROPAGATE waitStatus爲-3時,表示該線程以及後續線程進行無條件傳播(CountDownLatch中有使用)共享模式下, PROPAGATE 狀態的線程處於可運行狀態
由於只有前驅節點是head節點的節點才能被首先喚醒去進行同步狀態的獲取。當該節點獲取到同步狀態時,它會清除本身的值,將本身做爲head節點,以便喚醒下一個節點。
除了同步隊列以外,AQS中還存在Condition隊列,這是一個單向隊列。調用ConditionObject.await()方法,可以將當前線程封裝成Node加入到Condition隊列的末尾,而後將獲取的同步狀態釋放(即修改同步狀態的值,喚醒在同步隊列中的線程)。
Condition隊列也是FIFO。調用ConditionObject.signal()方法,可以喚醒firstWaiter節點,將其添加到同步隊列末尾。
在構建自定義同步器時,只須要依賴AQS底層再實現共享資源state的獲取與釋放操做便可。自定義同步器實現時主要實現如下幾種方法:
isHeldExclusively():該線程是否正在獨佔資源。只有用到condition才須要去實現它。
tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
tryReleaseShared(int):共享方式。嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回true,不然返回false。
線程首先嚐試獲取鎖,若是失敗就將當前線程及等待狀態等信息包裝成一個node節點加入到FIFO隊列中。 接着會不斷的循環嘗試獲取鎖,條件是當前節點爲head的直接後繼纔會嘗試。若是失敗就會阻塞本身直到本身被喚醒。而當持有鎖的線程釋放鎖的時候,會喚醒隊列中的後繼線程。
所謂獨佔模式,即只容許一個線程獲取同步狀態,當這個線程尚未釋放同步狀態時,其餘線程是獲取不了的,只能加入到同步隊列,進行等待。
很明顯,咱們能夠將state的初始值設爲0,表示空閒。當一個線程獲取到同步狀態時,利用CAS操做讓state加1,表示非空閒,那麼其餘線程就只能等待了。釋放同步狀態時,不須要CAS操做,由於獨佔模式下只有一個線程能獲取到同步狀態。ReentrantLock、CyclicBarrier正是基於此設計的。
例如,ReentrantLock,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。
獨佔模式下的AQS是不響應中斷的,指的是加入到同步隊列中的線程,若是由於中斷而被喚醒的話,不會當即返回,而且拋出InterruptedException。而是再次去判斷其前驅節點是否爲head節點,決定是否爭搶同步狀態。若是其前驅節點不是head節點或者爭搶同步狀態失敗,那麼再次掛起。
acquire以獨佔exclusive方式獲取資源。若是獲取到資源,線程直接返回,不然進入等待隊列,直到獲取到資源爲止,且整個過程忽略中斷的影響。源碼以下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
流程圖:
調用自定義同步器的tryAcquire()
嘗試直接去獲取資源,若是成功則直接返回;
沒成功,則addWaiter()
將該線程加入等待隊列的尾部,並標記爲獨佔模式;
acquireQueued()
使線程在等待隊列中休息,有機會時(輪到本身,會被unpark()
)會去嘗試獲取資源。獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。
若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt()
,將中斷補上。
tryAcquire
嘗試以獨佔的方式獲取資源,若是獲取成功,則直接返回true
,不然直接返回false
,且具體實現由自定義AQS的同步器實現的。
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
根據不一樣模式(Node.EXCLUSIVE
互斥模式、Node.SHARED
共享模式)建立結點並以CAS的方式將當前線程節點加入到不爲空的等待隊列的末尾(經過compareAndSetTail()
方法)。若是隊列爲空,經過enq(node)
方法初始化一個等待隊列,並返回當前節點。
/** * 參數 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * 返回值 * @return the new node */ private Node addWaiter(Node mode) { //將當前線程以指定的模式建立節點node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 獲取當前同隊列的尾節點 Node pred = tail; //隊列不爲空,將新的node加入等待隊列中 if (pred != null) { node.prev = pred; //CAS方式將當前節點尾插入隊列中 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //當隊列爲empty或者CAS失敗時會調用enq方法處理 enq(node); return node; }
其中,隊列爲empty,使用enq(node)
處理,將當前節點插入等待隊列,若是隊列爲空,則初始化當前隊列。全部操做都是CAS自旋的方式進行,直到成功加入隊尾爲止。
private Node enq(final Node node) { //不斷自旋 for (;;) { Node t = tail; //當前隊列爲empty if (t == null) { // Must initialize //完成隊列初始化操做,頭結點中不放數據,只是做爲起始標記,lazy-load,在第一次用的時候new if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; //不斷將當前節點使用CAS尾插入隊列中直到成功爲止 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued
用於已在隊列中的線程以獨佔且不間斷模式獲取state狀態,直到獲取鎖後返回。主要流程:
結點node進入隊列尾部後,檢查狀態;
調用park()進入waiting狀態,等待unpark()或interrupt()喚醒;
被喚醒後,是否獲取到鎖。若是獲取到,head指向當前結點,並返回從入隊到獲取鎖的整個過程當中是否被中斷過;若是沒獲取到,繼續流程1
final boolean acquireQueued(final Node node, int arg) { //是否已獲取鎖的標誌,默認爲true 即爲還沒有 boolean failed = true; try { //等待中是否被中斷過的標記 boolean interrupted = false; for (;;) { //獲取前節點 final Node p = node.predecessor(); //若是當前節點已經成爲頭結點,嘗試獲取鎖(tryAcquire)成功,而後返回 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //shouldParkAfterFailedAcquire根據對當前節點的前一個節點的狀態進行判斷,對當前節點作出不一樣的操做 //parkAndCheckInterrupt讓線程進入等待狀態,並檢查當前線程是否被能夠被中斷 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //將當前節點設置爲取消狀態;取消狀態設置爲1 if (failed) cancelAcquire(node); } }
release方法是獨佔exclusive模式下線程釋放共享資源的鎖。它會調用tryRelease()釋放同步資源,若是所有釋放了同步狀態爲空閒(即state=0),當同步狀態爲空閒時,它會喚醒等待隊列裏的其餘線程來獲取資源。這也正是unlock()的語義,固然不只僅只限於unlock().
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease()
跟tryAcquire()
同樣實現都是由自定義定時器以獨佔exclusive模式實現的。由於其是獨佔模式,不須要考慮線程安全的問題去釋放共享資源,直接減掉相應量的資源便可(state-=arg)。並且tryRelease()
的返回值表明着該線程是否已經完成資源的釋放,所以在自定義同步器的tryRelease()
時,須要明確這條件,當已經完全釋放資源(state=0),要返回true,不然返回false。
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
ReentrantReadWriteLock的實現:
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //減掉相應量的資源(state-=arg) int nextc = getState() - releases; //是否徹底釋放資源 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
unparkSuccessor
用unpark()喚醒等待隊列中最前驅的那個未放棄線程,此線程並不必定是當前節點的next節點,而是下一個能夠用來喚醒的線程,若是這個節點存在,調用unpark()方法喚醒。
private void unparkSuccessor(Node node) { //當前線程所在的結點node int ws = node.waitStatus; //置零當前線程所在的結點狀態,容許失敗 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //找到下一個須要喚醒的結點 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 從後向前找 for (Node t = tail; t != null && t != node; t = t.prev) //從這裏能夠看出,<=0的結點,都是還有效的結點 if (t.waitStatus <= 0) s = t; } if (s != null) //喚醒 LockSupport.unpark(s.thread); }
共享模式,固然是容許多個線程同時獲取到同步狀態,共享模式下的AQS也是不響應中斷的.
很明顯,咱們能夠將state的初始值設爲N(N > 0),表示空閒。每當一個線程獲取到同步狀態時,就利用CAS操做讓state減1,直到減到0表示非空閒,其餘線程就只能加入到同步隊列,進行等待。釋放同步狀態時,須要CAS操做,由於共享模式下,有多個線程能獲取到同步狀態。CountDownLatch、Semaphore正是基於此設計的。
例如,CountDownLatch,任務分爲N個子線程去執行,同步狀態state也初始化爲N(注意N要與線程個數一致):
acquireShared
在共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源爲止,整個過程忽略中斷。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
流程:
先經過tryAcquireShared()嘗試獲取資源,成功則直接返回;
失敗則經過doAcquireShared()中的park()進入等待隊列,直到被unpark()/interrupt()併成功獲取到資源才返回(整個等待過程也是忽略中斷響應)。
tryAcquireShared()
跟獨佔模式獲取資源方法同樣實現都是由自定義同步器去實現。但AQS規範中已定義好tryAcquireShared()
的返回值:
負值表明獲取失敗;
0表明獲取成功,但沒有剩餘資源;
正數表示獲取成功,還有剩餘資源,其餘線程還能夠去獲取。
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
doAcquireShared()
用於將當前線程加入等待隊列尾部休息,直到其餘線程釋放資源喚醒本身,本身成功拿到相應量的資源後才返回。
private void doAcquireShared(int arg) { //加入隊列尾部 final Node node = addWaiter(Node.SHARED); //是否成功標誌 boolean failed = true; try { //等待過程當中是否被中斷過的標誌 boolean interrupted = false; for (;;) { final Node p = node.predecessor();//獲取前驅節點 if (p == head) {//若是到head的下一個,由於head是拿到資源的線程,此時node被喚醒,極可能是head用完資源來喚醒本身的 int r = tryAcquireShared(arg);//嘗試獲取資源 if (r >= 0) {//成功 setHeadAndPropagate(node, r);//將head指向本身,還有剩餘資源能夠再喚醒以後的線程 p.next = null; // help GC if (interrupted)//若是等待過程當中被打斷過,此時將中斷補上。 selfInterrupt(); failed = false; return; } } //判斷狀態,隊列尋找一個適合位置,進入waiting狀態,等着被unpark()或interrupt() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
releaseShared()
用於共享模式下線程釋放共享資源,釋放指定量的資源,若是成功釋放且容許喚醒等待線程,它會喚醒等待隊列裏的其餘線程來獲取資源。
public final boolean releaseShared(int arg) { //嘗試釋放資源 if (tryReleaseShared(arg)) { //喚醒後繼結點 doReleaseShared(); return true; } return false; }
獨佔模式下的tryRelease()在徹底釋放掉資源(state=0)後,纔會返回true去喚醒其餘線程,這主要是基於獨佔下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制必定量的線程併發執行,那麼擁有資源的線程在釋放掉部分資源時就能夠喚醒後繼等待結點。
https://www.cnblogs.com/waterystone/p/4920797.html
doReleaseShared()
主要用於喚醒後繼節點線程,當state爲正數,去獲取剩餘共享資源;當state=0時去獲取共享資源。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //喚醒後繼 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // head發生變化 if (h == head) break; } }
各位看官還能夠嗎?喜歡的話,動動手指點個贊💗,點個關注唄!!謝謝支持!
也歡迎關注公衆號【Ccww筆記】,原創技術文章第一時間推出