本文介紹一下Java併發框架AQS,這是大神Doug Lea在JDK5的時候設計的一個抽象類,主要用於併發方面,功能強大。在新增的併發包中,不少工具類都能看到這個的影子,好比:CountDownLatch、Semaphore、ReentrantLock等,其內部基本都有一個Sync對象是實現了這個AQS這個抽象類,只是實現的過程不一樣而已,造就了這些不一樣功能特色的併發工具類,不得不說這個併發框架的強大。理解了這個抽象類,咱們能夠設計本身的併發工具類,達到相關的目的。html
其父類AbstractOwnableSynchronizer很簡單,就一個成員變量exclusiveOwnerThread,用於展現當前全部者的線程。要理解這個框架的用途,最快的方法就是看代碼上的註釋了。這裏對註釋進行簡要複述:java
基於先入先出隊列提供了一個用於實現阻塞鎖和相關同步器(信號量和事件等)的一個框架。這個類被設計成一個對於實現大部分同步器很是有效的基本類,基於一個原子的int值,表示狀態state。子類必須定義protected方法,來改變這個state。這個state意味着這個對象被獲取或者是釋放。除了這些方法,類中的其餘方法實現了全部的排隊和阻塞機制。子類能夠維護其餘狀態字段,可是隻能經過相關方法進行原子更新(compareAndSetState),以便同步過程被追蹤。node
子類應該被定義成一個非public的內部類,被外部類用來實現特性。這個類提供了默認的排他和共享模式。在排他模式中,其餘線程請求鎖都不會成功。共享模式多線程請求可能成功但不是必須。這個類並不清楚共享模式請求成功的意義,下一個等待線程也須要再次判斷是否可以請求。一般只能實現一種模式,可是也能夠同時實現共享和排他模式,能夠參考ReadWriteLock。子類若是隻支持一種模式,能夠不須要實現不適用的模式的方法。設計模式
這個類定義了一個嵌套類ConditionObject能夠用於子類支持排他模式的實現,isHeldExclusively方法用於判斷當前線程是不是持有排他鎖的線程,release方法只有在當前state徹底釋放了這個對象才被調用,acquire方法給定保存的狀態值,最終恢復到acquire以前的狀態。AQS中沒有方法建立這樣的一個condition,因此若是不能知足這些約束,不要使用它。condtionObject的行爲取決於它的同步器語義的實現。安全
該類爲內部隊列提供檢查、插裝和監視方法,condition object也有的相似方法。序列化這個類只會保存狀態,相關線程會丟失。多線程
若是須要使用這個類做爲同步器的基本實現,須要實現下面的方法,經過檢查或者修改同步器的狀態(方法有getState、setState、compareAndSetState):併發
1.tryAcquire框架
2.tryReleaseide
3.tryAcquireShared工具
4.tryReleaseShared
5.isHeldExclusively
實現的時候必須確保這些方法的內部線程安全,而且運行時間要短,非阻塞。定義這些方法是使用這個類惟一支持的事情,其餘方法都被聲明瞭,他們不能獨立變化。
上面的解釋很清楚了,首先是一個FIFO隊列。這個隊列是CLH(Craig,Landin,Hagersten)鎖隊列的變種。CLH鎖常常用於自旋鎖,在這裏用於鎖同步器,同時使用一些基本策略保存控制信息。status字段用於跟蹤一個線程是否應該被鎖。一個節點在它的前置節點釋放時被觸發。每個節點都做爲一個特定觸發風格的監視器,並持有一個等待中的線程。狀態字段不控制線程是否被授予鎖等等。一個在隊列中的首節點會嘗試獲取,可是不保證第一個就會成功,因此它可能從新等待。
入隊列就是放在tail,出隊列就是設置head。下面看看節點的定義:
首先說明一下幾個常量,這些都是用於status:
CANCELLED:這個節點因爲中斷或者超時,任務被取消了,爲這個狀態的節點狀態再也不改變,固然也不會被阻塞。
SIGNAL:當前節點的後繼節點是阻塞的,因此當前節點在釋放或者取消的時候必須喚醒後繼節點。爲了不競爭,acquire方法必須首先代表它們須要一個signal,而後再嘗試原子獲取鎖,最終失敗阻塞。
CONDITION:這個節點是一個條件隊列,它不會用做同步隊列節點知道被調用,此時status應該被設置成0(這個值與字段其餘用途無關,簡化告終構)
PROPAGATE:釋放共享鎖的時候,須要傳播通知其它節點。僅設置在頭節點上,即使有其餘操做干擾也要確保傳播的持續。
其餘的字段比較好理解,waitStatus就是狀態,prev前節點,next後節點,thread當前等待線程,nextWaiter是用於condition的,或者這個值是SHARED,代表是共享模式,isShared()方法就是經過nextWaiter==SHARED來進行判斷的。
這個類很簡單,就兩個節點firstWaiter、lastWaiter,都是2.1中的Node類型。結合上面的內容也能夠很容易明白這裏的做用了,將同一個條件的等待者構形成雙端鏈表放置在了一塊兒,能夠觸發相關方法,通知全部條件等待者。
addConditionWaiter():判斷尾節點是否被取消,取消移除全部被取消的節點,添加到鏈表尾。private方法,非線程安全。
doSignal():不斷的經過CAS操做將firstWaiter的status設置成0,成功後將該節點CAS入同步器的隊列尾,返回上一個隊列節點,判斷其等待狀態若是大於0,或者是設置成signal失敗,LockSupport,unpark當前節點的線程,完成通知操做。private方法。這個操做含義就是設置Condition的頭個等待節點的等待狀態是condition,設置失敗,意味着任務取消找到能夠設置的頭個節點。成功後就要將這個節點加入到同步器的隊列中,而且要保證前一個節點指示這個condition分支節點是等待狀態,因此前一個節點不能是cancelled或者設置成signal起始信號失敗,出現了這種狀況要釋放線程,從新進行同步。
signal():public final方法,先判斷是不是持有排他鎖。以前說過condition通常是排他模式,這個方法只有佔用鎖的線程才能發起signal信號,通知其它線程。最後調用的就是private的doSignal方法。
awaitUninterruptibly():這個實現了uninterruptible的條件等待,將線程添加到condition的等待鏈表中,而後嘗試釋放一下當前同步器,執行tryRelease,傳入當前同步器的狀態。若是成功,釋放頭節點的後繼節點,返回當前的同步器狀態。若是添加的這個節點不在同步隊列中,阻塞這個節點,直到這個節點被添加到隊列中。方法acquireQueued的主要工做就是:若是當該節點的前置節點成爲隊列的頭節點時,tryAcquire判斷是否能獲取排他鎖,前置節點獲取了排他鎖將該節點設置成頭節點,這是個自旋的過程,返回這個過程當中線程是否中斷的判斷,或者線程在此期間被中斷,標記線程中斷。
await():這類方法的實現差很少,都是先添加到condition的等待列表,而後嘗試根據同步器狀態判斷是否能夠釋放當前頭節點的後繼阻塞節點。後面同樣判斷這個condition是否保存在了FIFO隊列中,沒有阻塞這個節點,知道喚醒後進入了排隊等待隊列。後續也是自旋等待到本身這個節點進行操做。最後也就是判斷是否中斷,中斷模式執行相應的操做。
其餘方法不進行介紹,condition相關的主要就是介紹一下signal和await都幹了什麼。若是上面理解不了,能夠結合2.3中對主要方法的說明,來理解一下這幾個condition的操做。
AQS裏面的內容十分簡單,就5個內容:unsafe用於操做CAS,head隊列的頭節點,tail隊列的尾節點,state同步器當前狀態,spinForTimeoutThreshold自旋超時時間。裏面的方法在condition那裏已經介紹了一些,主要分爲:1.操做state的,2.CAS操做同步器狀態的,包括head、tail和state,3.查詢某個condition等待的線程狀況的,4.操做隊列的方法。
enq(Node):等待節點入隊列尾,返回其前驅節點。循環CAS操做,線程安全。
addWaiter(Node mode):用所給的模式,設置本線程的節點。嘗試通常執行入隊列,成功則沒有競爭安全,不成功調用enq方法入隊列,保證競爭中安全。
unparkSuccessor(Node node):喚醒後置節點,若是本節點狀態是負數,則嘗試修改其爲0,失敗也不要緊,關注的主要是後繼節點。這裏會選擇一個未取消的後繼節點,而後執行LockSupport.unpark(s.thread),喚醒這個線程。
doReleaseShared():share模式下的release操做。先是肯定head的狀態是signal,將其轉換成0,並調用unparkSuccessor方法喚醒後繼節點。成功後若是狀態是0,將其狀態改成傳播狀態propagate。所有設置好了,就退出循環的CAS操做。
setHeadAndPropagate(Node node):設置頭節點,並傳播。要求參數大於0,或者節點爲null,或者節點狀態爲負,另外要求後繼節點若是存在必須是share模式,以後就調用doReleaseShared方法。
cancelAcquire(Node node):取消一個正在請求獲取的節點。找到該節點的前置非取消的第一個節點(往前數),將這個pred設置爲它的前置節點。節點的狀態變成取狀態,若是該節點正好在尾部,移除掉,將pred設置爲隊列的尾節點,另外將pred.next設置成其後繼節點。若是這個pred節點不是頭節點,狀態是signal或者小於0並修改爲了signal,將此節點的next節點設置成pred的next。若是都不是,調用unparkSuccessor方法,直接喚醒後繼節點便可。
shouldParkAfterFailedAcquire(Node pred, Node node):檢查並更新獲取失敗的節點的狀態,若是該線程須要阻塞,返回true。要求pred = node.prev。pred的狀態原本就是signal,返回true。若是pred取消了,繼續找node的前一個未取消的節點,設置相關依賴,不然,將其改爲signal,最後返回false。
acquireQueued(Node node, int arg):這個以前介紹過,用於排他模式下的uninterruptible。主要做用就是找到該節點的前置節點,若是是head,就嘗試獲取鎖,若是成功了,將這個設置爲頭節點。不是head,就調用shouldParkAfterFailedAcquire方法,做爲一個失敗的後繼節點,設置狀態成須要阻塞。
其餘方法再也不過多描述,基本看明白這幾個方法幹了些什麼事情就能夠了,下面結合具體的使用,來從全局的角度看一下這個併發框架是如何使用的。
private static class Test { private int n; public int getN() { return n; } public void inc() { n++; } } public static void main(String[] args) throws InterruptedException { int threadN = 20; int cycle = 10000; final Test test = new Test(); for(int i = 0; i < threadN; i++) { new Thread(new Runnable() { @Override public void run() { for(int i = 0; i < cycle; i++) { test.inc(); } System.out.println("ok"); } }).start(); } Thread.sleep(3000); System.out.println(test.getN()); }
上面是一段測試代碼,很顯然是線程不安全的,這個時候使用ReentrantLock修改一下Test類就能夠保證其線程安全了。
private static class Test { private ReentrantLock lock = new ReentrantLock(); private int n; public int getN() { return n; } public void inc() { try { lock.lock(); n++; } finally { lock.unlock(); } } }
ReentrantLock毫無疑問是實現了一個鎖的問題,而且其實現的仍是可重入鎖。下面着重關注lock和unlock的實現,來看待AQS的運行。
ReentrantLock實現了兩種鎖,公平鎖和非公平鎖,公平就是按照阻塞順序進行鎖的獲取,非公平就是競爭獲取鎖,默認採起的是非公平鎖,公平鎖會有額外的開銷。
先看這個非公平鎖的lock方法:
1.lock方法先是嘗試無鎖的狀況下,獲取鎖,成功了就設置當前持有鎖的線程。失敗了調用acquire方法,傳入獲取1。
2.acquire方法主要是先調用了tryAcquire(1)方法請求獲取鎖,失敗了就先建立一個排他模式的等待節點,執行acquireQueued方法,這個方法上面說過,會讓當前節點的前置節點若是是head進行獲取鎖嘗試,成功了,那麼該節點就變成了第一個節點,失敗了這個節點就要變成一個阻塞節點,並檢測節點是否取消。
3.tryAcquire嘗試獲取鎖的方法在這裏就是調用了nonfairTryAcquire方法,這個方法中先判斷當前同步器的狀態,若是是0,意味着這期間鎖被釋放了,馬上嘗試獲取一下,成功同樣是設置此線程是鎖的全部者線程,返回true,這樣2步驟的阻塞就不須要了,直接執行便可。若是當前線程就是排他線程,則state狀態累加,注意這就是可重入鎖的含義,lock了多少次,就必須unlock多少次。更新這個狀態。都不是天然不可能獲取鎖,執行2後面的阻塞動做。
unlock方法就是release方法:
1.release是AQS的方法,其調用tryRelease(arg)方法進行嘗試釋放鎖,成功了以後會喚醒後繼節點,調用的就是unparkSuccessor方法。
2.和上面的同樣,unlock就是要state扣除了,若是調用unlock方法的線程不是持有鎖的線程會拋出異常,若是state變成0了,就意味着沒有線程擁有鎖,設置state,清楚ownThread。
看到這裏就很清楚了,首先就是state應該算是記錄了多少次調用鎖lock的方法的次數,爲0的時候意味着沒有競爭,不須要阻塞,但狀態仍是要設置的,大於0就意味的被持有了,排他的話就須要放入等待隊列了。unlock的時候就須要減去目標的state的值了,並喚醒等待隊列中後一個節點來執行後續步驟。
這裏有個很奇怪的事情,非公平鎖不是說不以請求順序決定的嗎,喚醒下一個等待節點那不是有序的?問題就在於構建等待隊列的時候並無保證順序,非公平的步驟中咱們能夠看到其在判斷state爲0的時候直接進行嘗試獲取了鎖,可是頗有可能剛剛有個線程釋放了鎖,原本是其後繼節點得到鎖的,這樣就插了一個隊。
看到這裏咱們就能明白公平鎖之因此公平,問題就出在其獲取鎖的實現方式了:
這裏能夠看到,公平鎖和非公平鎖惟一的區別就在於公平鎖在state爲0的時候並無馬上獲取鎖,而是判斷隊列中是否有等待節點,沒有才能直接獲取。這就是ReentrantLock中的公平與非公平鎖的區別。可是公平鎖必定公平嗎?這個就很差說了,由於兩個線程在插入等待隊列中的時候依舊存在競爭,順序可能打亂,可是從本意上講那也是賽跑看誰先排上隊,至少排隊肯定了以後就是公平的了。
上面講的ReentrantLock無疑是一個排他模式的AQS的運用,這裏講的CountDownLatch就是一個共享模式的運用了。這個類的做用頗有意思,比如倒計時,主要目標就是全部子線程任務都完成了,計數減一,最終主線程才能執行下去。下面給一個使用例子:
public static void main(String[] args) throws InterruptedException { int thread = 5; CountDownLatch latch = new CountDownLatch(thread); for(int i = 0; i < thread; i++) { final int n = i; new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(n*1000); System.out.println("ok"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } latch.await(); System.out.println("over"); }
若是沒有latch,開啓多個線程執行會直接退出,由於主線程已經執行完了,有了這個以後就須要等待最初設置數量的子線程執行完畢纔可。
能夠看到代碼中子線程並無任何阻塞,想象一下大體就能明白CountDownLatch的實現過程了。先傳入一個設置的state,這個就是new建立時傳入的整數了。
countdown方法調用了AQS的releaseShared(1)方法,實際上就回調了一些本身須要覆寫的tryReleaseShared(方法),因爲沒有實質上生成等待節點,因此doReleaseShared是沒有太大意義的。tryReleaseShared方法主要就是將state的計數經過循環CAS進行減1了,若是是state是0直接返回false,意味着多調用了一次,不正確。
上面的說明基本能搞明白CountDownLatch的一個基本實現,可是主線程是如何實現阻塞的呢?繼續看代碼:await調用了AQS的acquireSharedInterruptibly的方法,反過來回調了tryAcquireShared方法,判斷是是否小於0,小於0纔會執行後續操做。因此在tryAcquireShared的方法中,若是state爲0,意味着全部子線程執行完了,那還等啥,返回一個1,不須要執行等待操做了。反之返回一個-1,執行AQS的doAcquireSharedInterruptibly方法吧。這個方法會建立一個SHARE模式的等待節點,並不斷循環查看tryAcquireShared,這個方法前面說了,判斷的是當前是否全部子線程執行完了,因此當返回大於0的值時,就調用share模式的傳播方法,實際上主線程根本不會阻塞,只是在不斷的循環而已。直到調用setHeadAndPropagate方法,執行完畢後就返回了。
這個鎖也是以前提到的,是一個很神奇的類,讀寫鎖並存(即共享模式和排他模式)。更神奇的是讀鎖後接了寫鎖,鎖即升級了,再也不容許讀操做,排隊等寫完成,全部的讀完成後就切換成寫鎖了。寫操做完成後,讀操做又能夠進行了。這是怎麼實現的呢?
實際上讀寫鎖的內部實現了3個內容,讀寫鎖,和Sync鎖。也分爲公平和非公平。讀寫鎖對象內部持有的鎖就是父類的Sync鎖,這樣他們就創建起了聯繫。
首先看讀鎖,先調用的就是sync的acquireShared(1)方法,這個毫無疑問。以後就會回調tryAcquireShared,嘗試獲取鎖,具體步驟以下:
經過exclusiveCount方法咱們能夠明白這裏面的門道了。state是一個int類型,讀寫鎖將state設計成前16位存儲寫鎖對象,後16位設計成存放讀鎖對象。寫鎖不爲0且同步器當前全部線程不是這個線程,返回-1,執行後續的doAcquireShared(1)方法,進入等待。這就是上面說的,寫鎖存在的時候,讀鎖須要等待寫鎖完成。若是能夠加讀鎖,分紅首次等不一樣狀況進行處理。
讀鎖的unlock方法同樣調用了releaseShared(1)方法,最終調用了就是tryReleaseShared的方法。
過程大體上就是讀鎖的一個逆過程。至於寫鎖的特性繼續看,鎖住的邏輯基本一致,主要關係tryAcquire的方法:
獲取狀態和寫鎖的數量,若是存在鎖,寫鎖爲0,但當前的線程不是持有鎖的線程,不能獲取鎖,即有讀鎖不能加寫鎖,有寫鎖不是本線程也不行。達到鎖上限也不行,都經過了就能夠獲取寫鎖了。失敗了天然去了阻塞隊列,等待寫鎖完成,或其餘寫鎖完成釋放鎖了。沒有設置過鎖就進行設置。
寫鎖解鎖的方法很簡單,就是一個普通的鎖去除而已。
信號量也是一個有特色的設計類,簡單的理解爲其控制最大併發數。它維持着一個憑證數量,每一個線程須要獲取一個憑證,執行完歸還。憑證借完了以後就沒有了,新的線程只能等待其餘線程歸還憑證了。下面一個例子能夠運行看看。
public static void main(String[] args) throws InterruptedException { final Semaphore semaphore = new Semaphore(2); for(int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { semaphore.acquire(); Thread.sleep(1000); System.out.println(System.currentTimeMillis()); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } Thread.sleep(4000); }
上面代碼就能看到信號量的一個使用方法了,先獲取一個憑證,執行後續代碼,執行完成以後釋放憑證。固然上述例子很不嚴謹,拋出異常後釋放動做就沒法執行了。
看到這裏根據以前所說的,就能猜想一下信號量的實現過程了。首先因爲容許多個線程同時容許,因此必定是共享模式。另外構造方法中的參數必定是同步器的state。acquire方法就應該是反過來的-1操做了,減到零就阻塞。release操做就是反過來的+1操做了。
簡單看一下果真是這麼作的,另外要注意的就是release操做若是亂用是會超過最初設置的容許大小的,好比acquire(1),可是release(2),Semaphore是不會關心這個的。能夠將上面的例子改爲這個試試。
以前我看到一道題,兩個線程如何交替打印1,2,3...。第一反應就是使用notify和wait來實現了,可是說Semaphore能夠實現。看到這裏你會怎麼使用semaphore來實現呢?可能想使用憑證爲1的信號量,公平鎖就能夠了啊。其實是不行的,緣由以前說過,這裏的公平含義和你所想的是有出入的,不必定是交替執行的。那麼不能實現了嗎?也不是,這裏有個頗有意思的操做,直接看代碼便可:
public static void test() { final Semaphore[] semaphores = new Semaphore[]{new Semaphore(1), new Semaphore(0)}; for(int i = 0; i < 2; i++) { final int n = i; new Thread(new Runnable() { @Override public void run() { int index = n; Semaphore aim = semaphores[index]; Semaphore other = semaphores[1-index]; for(int i = 0; i < 10; i++) { try { aim.acquire(); System.out.println(Thread.currentThread().getName()+":"+SemaphoreTest.getAndInc()); other.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "Thread-" + i).start(); } }
其中有個方法就是自增操做,不列出來了。
這個類雖然和AQS沒有直接的關係,可是其實現使用到的是ReentrantLock,也不是徹底無關,另外就是此類的功能也有特色。這個是一個欄閂,能夠看做是屏障或者是起跑線,做用就是隻有知足最初設置數量的線程在等待,纔會開始運行,就像有個欄杆把全部線程攔住了,必須等人到齊,才能運行。這個類的例子有不少,好比:這裏。本文就不給出了。
主要方法就一個dowait方法,parties表示總參與者數量,count表示未就位的參與者數量。generation表示一個週期,就像連接中的例子給出來的,能夠重複使用,進行二段賽跑。breakBarrier和nextGeneration都是結束本輪,進行下一輪的相關方法。
最後仍是關注一下dowait方法的實現,首先就是加鎖,一個個來。1.查看本輪是否結束,結束拋出異常。2.查看線程是否中斷,中斷結束本輪。3.count未參與者數量減1,若是減到零說明全部的人準備齊了,進入下一輪nextGeneration,返回。4.使用condition的await方法。這樣在有喚醒操做的時候就能夠全部線程繼續運行下去了,這裏喚醒動做就在breakBarrier和nextGenenration裏面了,步驟2中減到零,全部的對象都到齊,就進入下一輪順帶喚醒本輪的全部線程。這樣就達到了這個類的目的了。
AQS的使用很簡單,只須要實現2中所說的那幾個方法就能夠了。同步器以state做爲判斷依據,怎麼定義state就看相關人員怎麼設計了,同步器採用了模板方法設計模式,實現了排他鎖和共享鎖的相關邏輯,只須要實現方法,判斷是否須要阻塞添加到等待隊列便可,全部其餘過程均已被封裝。
對於封裝的方法不理解,能夠參考這篇文章,可能更詳細一點:這裏。
本文主要仍是講了一下JDK中幾個用到了AQS的類,他們的特色以及實現過程,旨在學習如何使用AQS定義出本身想要的同步工具。