AbstractQueuedSynchronizer AQS框架源碼剖析

1、引子

Java.util.concurrent包都是Doug Lea寫的,來混個眼熟html

是的,就是他,提出了JSR166(Java Specification RequestsJava 規範提案),該規範的核心就是AbstractQueuedSynchronizer同步器框架(AQS)。這個框架爲構造同步器提供一種通用的機制,而且被j.u.c包中大部分類使用。java

包結構以下圖,其中AbstractOwnableSynchronizer是其父類,而AbstractQueuedLongSynchronizer是其32位狀態的升級版64位的實現,適用於多級屏障(CyclicBarrier)。node

AQS的繼承關係以下圖,可見老李頭對它多重視了。老李頭的論文解析飛機票:《The java.util.concurrent Synchronizer Framework》 JUC同步器框架(AQS框架)原文翻譯 安全

2、AQS架構設計原理

2.1 需求分析

爲了使框架能獲得普遍應用,AQS同步器定義兩種資源共享方式:數據結構

Exclusive:獨佔模式,同時只有一個線程能執行,如ReentrantLock多線程

Share:共享模式,多個線程可同時執行,如Semaphore/CountDownLatch。架構

  通常來講,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種便可。但AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。併發

  自定義同步器在實現時只須要實現共享資源state的獲取與釋放方式便可至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現如下幾種方法:框架

  • isHeldExclusively():該線程是否正在獨佔資源。只有用到condition才須要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。 

  AQS爲了實現上述操做,須要下面三個基本組件的相互協做:函數

  • 同步狀態的原子性管理
  • 線程的阻塞與解除阻塞
  • 隊列的管理

2.2 同步狀態的原子性管理

state字段, 用於同步線程之間的共享狀態。經過 CAS 和 volatile 保證其原子性和可見性。

以下圖:

1)volatile修飾state:

線程內的工做內存修改數據後會強制刷新到主存中去,且使其餘線程中的工做內存中的該變量失效,下次只能從主存讀取。實現了多線程數據可見性。

2)CAS操做state:

unsafe.compareAndSwapInt(this, stateOffset, expect, update); 根據對象的state同步狀態偏移量是否和expect值相同,相同則更新。標準的CAS操做。unsafe飛機票:在openjdk8下看Unsafe源碼

 

2.3 線程的阻塞與解除阻塞

利用LockSupport.park() 和 LockSupport.unpark() 實現線程的阻塞和喚醒(底層調用Unsafe的native park和unpark實現),同時支持超時時間。

 

2.4 隊列的管理

  根據論文裏描述, AQS 裏將阻塞線程封裝到一個內部類 Node 裏。並維護一個 CLH Node FIFO 隊列。 CLH隊列是一個非阻塞的 FIFO 隊列,也就是說往裏面插入或移除一個節點的時候,在併發條件下不會阻塞,而是經過自旋鎖和 CAS 保證節點插入和移除的原子性。AQS裏的CLH是一個雙向鏈表,數據結構以下圖:

 

node數據結構,後續加上。

 

3、AQS源碼實現

本節開始講解AQS的源碼實現。依照acquire-release、acquireShared-releaseShared的次序來。

3.1 acquire(int)獨佔模式獲取資源

  此方法是獨佔模式下線程獲取共享資源的頂層入口。若是獲取到資源,線程直接返回,不然進入等待隊列,直到獲取到資源爲止,且整個過程忽略中斷的影響。這也正是lock()的語義,固然不只僅只限於lock()。獲取到資源後,線程就能夠去執行其臨界區代碼了。下面是acquire()的源碼:

1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3  acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4  selfInterrupt(); 5 }

 

  函數流程以下:

    1. tryAcquire()嘗試直接去獲取資源,若是成功則直接返回;
    2. addWaiter()將該線程加入等待隊列的尾部,並標記爲獨佔模式;
    3. acquireQueued()使線程在等待隊列中獲取資源,一直獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。
    4. 若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt(),將中斷補上。

  這時單憑這4個抽象的函數來看流程還有點朦朧,沒關係,看完接下來的分析後,你就會明白了。就像《大話西遊》裏唐僧說的:等你明白了捨生取義的道理,你天然會回來和我唱這首歌的。

3.1.1 tryAcquire(int)

  此方法嘗試去獲取獨佔資源。若是獲取成功,則直接返回true,不然直接返回false。這也正是tryLock()的語義,仍是那句話,固然不只僅只限於tryLock()。以下是tryAcquire()的源碼:

1     protected boolean tryAcquire(int arg) { 2 throw new UnsupportedOperationException(); 3 }

 

  什麼?直接throw異常?說好的功能呢?好吧,還記得概述裏講的AQS只是一個框架,具體資源的獲取/釋放方式交由自定義同步器去實現嗎?就是這裏了!!!AQS這裏只定義了一個接口,具體資源的獲取交由自定義同步器去實現了(經過state的get/set/CAS)!!!至於能不能重入,能不能加塞,那就看具體的自定義同步器怎麼去設計了!!!固然,自定義同步器在進行資源訪問時要考慮線程安全的影響。

  這裏之因此沒有定義成abstract,是由於獨佔模式下只用實現tryAcquire-tryRelease,而共享模式下只用實現tryAcquireShared-tryReleaseShared。若是都定義成abstract,那麼每一個模式也要去實現另外一模式下的接口。說到底,Doug Lea仍是站在我們開發者的角度,儘可能減小沒必要要的工做量。

3.1.2 addWaiter(Node)

  此方法用於將當前線程加入到等待隊列的隊尾,並返回當前線程所在的結點。仍是上源碼吧:

複製代碼
 1 private Node addWaiter(Node mode) {  2 //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享)  3 Node node = new Node(Thread.currentThread(), mode);  4  5 //嘗試快速方式直接放到隊尾。  6 Node pred = tail;  7 if (pred != null) {  8 node.prev = pred;  9 if (compareAndSetTail(pred, node)) { 10 pred.next = node; 11 return node; 12  } 13  } 14 15 //上一步失敗則經過enq入隊。 16  enq(node); 17 return node; 18 }
複製代碼

 不用再說了,直接看註釋吧。

3.1.2.1 enq(Node)

   此方法用於將node加入隊尾。源碼以下:

複製代碼
 1 private Node enq(final Node node) {  2 //CAS"自旋",直到成功加入隊尾  3 for (;;) {  4 Node t = tail;  5 if (t == null) { // 隊列爲空,建立一個空的標誌結點做爲head結點,並將tail也指向它。  6 if (compareAndSetHead(new Node()))  7 tail = head;  8 } else {//正常流程,放入隊尾  9 node.prev = t; 10 if (compareAndSetTail(t, node)) { 11 t.next = node; 12 return t; 13  } 14  } 15  } 16 }
複製代碼

 

若是你看過AtomicInteger.getAndIncrement()函數源碼,那麼相信你一眼便看出這段代碼的精華。CAS自旋volatile變量,是一種很經典的用法。還不太瞭解的,本身去百度一下吧。

3.1.3 acquireQueued(Node, int)

  OK,經過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部了。聰明的你馬上應該能想到該線程下一部該幹什麼了吧:進入等待狀態休息,直到其餘線程完全釋放資源後喚醒本身,本身再拿到資源,而後就能夠去幹本身想幹的事了。沒錯,就是這樣!是否是跟醫院排隊拿號有點類似~~acquireQueued()就是幹這件事:在等待隊列中排隊拿號(中間沒其它事幹能夠休息),直到拿到號後再返回。這個函數很是關鍵,仍是上源碼吧:

複製代碼
 1 final boolean acquireQueued(final Node node, int arg) {  2 boolean failed = true;//標記是否成功拿到資源  3 try {  4 boolean interrupted = false;//標記等待過程當中是否被中斷過  5  6 //又是一個「自旋」!  7 for (;;) {  8 final Node p = node.predecessor();//拿到前驅  9 //若是前驅是head,即該結點已成老二,那麼便有資格去嘗試獲取資源(多是老大釋放完資源喚醒本身的,固然也可能被interrupt了)。 10 if (p == head && tryAcquire(arg)) { 11 setHead(node);//拿到資源後,將head指向該結點。因此head所指的標杆結點,就是當前獲取到資源的那個結點或null。 12 p.next = null; // setHead中node.prev已置爲null,此處再將head.next置爲null,就是爲了方便GC回收之前的head結點。也就意味着以前拿完資源的結點出隊了! 13 failed = false; 14 return interrupted;//返回等待過程當中是否被中斷過 15  } 16 17 //若是本身能夠休息了,就進入waiting狀態,直到被unpark() 18 if (shouldParkAfterFailedAcquire(p, node) && 19  parkAndCheckInterrupt()) 20 interrupted = true;//若是等待過程當中被中斷過,哪怕只有那麼一次,就將interrupted標記爲true 21  } 22 } finally { 23 if (failed) 24  cancelAcquire(node); 25  } 26 }
複製代碼

 

到這裏了,咱們先不急着總結acquireQueued()的函數流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體幹些什麼。

3.1.3.1 shouldParkAfterFailedAcquire(Node, Node)

  此方法主要用於檢查狀態,看看本身是否真的能夠去休息了(進入waiting狀態,若是線程狀態轉換不熟,能夠參考本人上一篇寫的Thread詳解),萬一隊列前邊的線程都放棄了只是瞎站着,那也說不定,對吧!

複製代碼
 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  2 int ws = pred.waitStatus;//拿到前驅的狀態  3 if (ws == Node.SIGNAL)  4 //若是已經告訴前驅拿完號後通知本身一下,那就能夠安心休息了  5 return true;  6 if (ws > 0) {  7 /*  8  * 若是前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後邊。  9  * 注意:那些放棄的結點,因爲被本身「加塞」到它們前邊,它們至關於造成一個無引用鏈,稍後就會被保安大叔趕走了(GC回收)! 10 */ 11 do { 12 node.prev = pred = pred.prev; 13 } while (pred.waitStatus > 0); 14 pred.next = node; 15 } else { 16 //若是前驅正常,那就把前驅的狀態設置成SIGNAL,告訴它拿完號後通知本身一下。有可能失敗,人家說不定剛剛釋放完呢! 17  compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 18  } 19 return false; 20 }
複製代碼

 

整個流程中,若是前驅結點的狀態不是SIGNAL,那麼本身就不能安心去休息,須要去找個安心的休息點,同時能夠再嘗試下看有沒有機會輪到本身拿號。

3.1.3.2 parkAndCheckInterrupt()

  若是線程找好安全休息點後,那就能夠安心去休息了。此方法就是讓線程去休息,真正進入等待狀態。

1 private final boolean parkAndCheckInterrupt() { 2 LockSupport.park(this);//調用park()使線程進入waiting狀態 3 return Thread.interrupted();//若是被喚醒,查看本身是否是被中斷的。 4 }

   park()會讓當前線程進入waiting狀態。在此狀態下,有兩種途徑能夠喚醒該線程:1)被unpark();2)被interrupt()。(再說一句,若是線程狀態轉換不熟,能夠參考本人寫的Thread詳解)。須要注意的是,Thread.interrupted()會清除當前線程的中斷標記位。 

3.1.3.3 小結

  OK,看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),如今讓咱們再回到acquireQueued(),總結下該函數的具體流程:

  1. 結點進入隊尾後,檢查狀態,找到安全休息點;
  2. 調用park()進入waiting狀態,等待unpark()或interrupt()喚醒本身;
  3. 被喚醒後,看本身是否是有資格能拿到號。若是拿到,head指向當前結點,並返回從入隊到拿到號的整個過程當中是否被中斷過;若是沒拿到,繼續流程1。

 

3.1.4 小結

  OKOK,acquireQueued()分析完以後,咱們接下來再回到acquire()!再貼上它的源碼吧:

1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3  acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4  selfInterrupt(); 5 }

再來總結下它的流程吧:

  1. 調用自定義同步器的tryAcquire()嘗試直接去獲取資源,若是成功則直接返回;
  2. 沒成功,則addWaiter()將該線程加入等待隊列的尾部,並標記爲獨佔模式;
  3. acquireQueued()使線程在等待隊列中休息,有機會時(輪到本身,會被unpark())會去嘗試獲取資源。獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。
  4. 若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt(),將中斷補上。

因爲此函數是重中之重,我再用流程圖總結一下:

至此,acquire()的流程終於算是告一段落了。這也就是ReentrantLock.lock()的流程,不信你去看其lock()源碼吧,整個函數就是一條acquire(1)!!!

 

3.2 release(int)獨佔模式釋放資源

   上一小節已經把acquire()說完了,這一小節就來說講它的反操做release()吧。此方法是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。這也正是unlock()的語義,固然不只僅只限於unlock()。下面是release()的源碼:

複製代碼
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) { 3 Node h = head;//找到頭結點 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h);//喚醒等待隊列裏的下一個線程 6 return true; 7  } 8 return false; 9 }
複製代碼

 

  邏輯並不複雜。它調用tryRelease()來釋放資源。有一點須要注意的是,它是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!因此自定義同步器在設計tryRelease()的時候要明確這一點!!

3.2.1 tryRelease(int)

  此方法嘗試去釋放指定量的資源。下面是tryRelease()的源碼:

1 protected boolean tryRelease(int arg) { 2 throw new UnsupportedOperationException(); 3 }

 

  跟tryAcquire()同樣,這個方法是須要獨佔模式的自定義同步器去實現的。正常來講,tryRelease()都會成功的,由於這是獨佔模式,該線程來釋放資源,那麼它確定已經拿到獨佔資源了,直接減掉相應量的資源便可(state-=arg),也不須要考慮線程安全的問題。但要注意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!因此自義定同步器在實現時,若是已經完全釋放資源(state=0),要返回true,不然返回false。

3.2.2 unparkSuccessor(Node)

  此方法用於喚醒等待隊列中下一個線程。下面是源碼:

複製代碼
 1 private void unparkSuccessor(Node node) {  2 //這裏,node通常爲當前線程所在的結點。  3 int ws = node.waitStatus;  4 if (ws < 0)//置零當前線程所在的結點狀態,容許失敗。  5 compareAndSetWaitStatus(node, ws, 0);  6  7 Node s = node.next;//找到下一個須要喚醒的結點s  8 if (s == null || s.waitStatus > 0) {//若是爲空或已取消  9 s = null; 10 for (Node t = tail; t != null && t != node; t = t.prev) 11 if (t.waitStatus <= 0)//從這裏能夠看出,<=0的結點,都是還有效的結點。 12 s = t; 13  } 14 if (s != null) 15 LockSupport.unpark(s.thread);//喚醒 16 }
複製代碼

 

  這個函數並不複雜。一句話歸納:用unpark()喚醒等待隊列中最前邊的那個未放棄線程,這裏咱們也用s來表示吧。此時,再和acquireQueued()聯繫起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即便p!=head也不要緊,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裏既然s已是等待隊列中最前邊的那個未放棄線程了,那麼經過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),而後s把本身設置成head標杆結點,表示本身已經獲取到資源了,acquire()也返回了!!And then, DO what you WANT!

3.2.3 小結

  release()是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。

3.3 acquireShared(int)共享模式獲取資源

  此方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源爲止,整個過程忽略中斷。下面是acquireShared()的源碼:

1 public final void acquireShared(int arg) { 2 if (tryAcquireShared(arg) < 0) 3  doAcquireShared(arg); 4 }

 

  這裏tryAcquireShared()依然須要自定義同步器去實現。可是AQS已經把其返回值的語義定義好了:負值表明獲取失敗;0表明獲取成功,但沒有剩餘資源;正數表示獲取成功,還有剩餘資源,其餘線程還能夠去獲取。因此這裏acquireShared()的流程就是:

    1. tryAcquireShared()嘗試獲取資源,成功則直接返回;
    2. 失敗則經過doAcquireShared()進入等待隊列,直到獲取到資源爲止才返回。

3.3.1 doAcquireShared(int)

  此方法用於將當前線程加入等待隊列尾部休息,直到其餘線程釋放資源喚醒本身,本身成功拿到相應量的資源後才返回。下面是doAcquireShared()的源碼:

複製代碼
 1 private void doAcquireShared(int arg) {  2 final Node node = addWaiter(Node.SHARED);//加入隊列尾部  3 boolean failed = true;//是否成功標誌  4 try {  5 boolean interrupted = false;//等待過程當中是否被中斷過的標誌  6 for (;;) {  7 final Node p = node.predecessor();//前驅  8 if (p == head) {//若是到head的下一個,由於head是拿到資源的線程,此時node被喚醒,極可能是head用完資源來喚醒本身的  9 int r = tryAcquireShared(arg);//嘗試獲取資源 10 if (r >= 0) {//成功 11 setHeadAndPropagate(node, r);//將head指向本身,還有剩餘資源能夠再喚醒以後的線程 12 p.next = null; // help GC 13 if (interrupted)//若是等待過程當中被打斷過,此時將中斷補上。 14  selfInterrupt(); 15 failed = false; 16 return; 17  } 18  } 19 20 //判斷狀態,尋找安全點,進入waiting狀態,等着被unpark()或interrupt() 21 if (shouldParkAfterFailedAcquire(p, node) && 22  parkAndCheckInterrupt()) 23 interrupted = true; 24  } 25 } finally { 26 if (failed) 27  cancelAcquire(node); 28  } 29 }
複製代碼

 

  有木有以爲跟acquireQueued()很類似?對,其實流程並無太大區別。只不過這裏將補中斷的selfInterrupt()放到doAcquireShared()裏了,而獨佔模式是放到acquireQueued()以外,其實都同樣,不知道Doug Lea是怎麼想的。

  跟獨佔模式比,還有一點須要注意的是,這裏只有線程是head.next時(「老二」),纔會去嘗試獲取資源,有剩餘的話還會喚醒以後的隊友。那麼問題就來了,假如老大用完後釋放了5個資源,而老二須要6個,老三須要1個,老四須要2個。由於老大先喚醒老二,老二一看資源不夠本身用繼續park(),也更不會去喚醒老三和老四了。獨佔模式,同一時刻只有一個線程去執行,這樣作何嘗不可;但共享模式下,多個線程是能夠同時執行的,如今由於老二的資源需求量大,而把後面量小的老三和老四也都卡住了。

 

3.3.1.1 setHeadAndPropagate(Node, int)

複製代碼
 1 private void setHeadAndPropagate(Node node, int propagate) {  2 Node h = head;  3 setHead(node);//head指向本身  4 //若是還有剩餘量,繼續喚醒下一個鄰居線程  5 if (propagate > 0 || h == null || h.waitStatus < 0) {  6 Node s = node.next;  7 if (s == null || s.isShared())  8  doReleaseShared();  9  } 10 }
複製代碼

 

  此方法在setHead()的基礎上多了一步,就是本身甦醒的同時,若是條件符合(好比還有剩餘資源),還會去喚醒後繼結點,畢竟是共享模式!

  doReleaseShared()咱們留着下一小節的releaseShared()裏來說。

 

3.3.2 小結

  OK,至此,acquireShared()也要告一段落了。讓咱們再梳理一下它的流程:

    1. tryAcquireShared()嘗試獲取資源,成功則直接返回;
    2. 失敗則經過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()併成功獲取到資源才返回。整個等待過程也是忽略中斷的。

  其實跟acquire()的流程大同小異,只不過多了個本身拿到資源後,還會去喚醒後繼隊友的操做(這纔是共享嘛)

3.4 releaseShared()共享模式釋放資源

  上一小節已經把acquireShared()說完了,這一小節就來說講它的反操做releaseShared()吧。此方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。下面是releaseShared()的源碼:

複製代碼
1 public final boolean releaseShared(int arg) { 2 if (tryReleaseShared(arg)) {//嘗試釋放資源 3 doReleaseShared();//喚醒後繼結點 4 return true; 5  } 6 return false; 7 }
複製代碼

 

  此方法的流程也比較簡單,一句話:釋放掉資源後,喚醒後繼。跟獨佔模式下的release()類似,但有一點稍微須要注意:獨佔模式下的tryRelease()在徹底釋放掉資源(state=0)後,纔會返回true去喚醒其餘線程,這主要是基於可重入的考量;而共享模式下的releaseShared()則沒有這種要求,一是共享的實質--多線程可併發執行;二是共享模式基本也不會重入吧(至少我還沒見過),因此自定義同步器能夠根據須要決定返回值。

3.4.1 doReleaseShared()

  此方法主要用於喚醒後繼。下面是它的源碼:

複製代碼
 1 private void doReleaseShared() {  2 for (;;) {  3 Node h = head;  4 if (h != null && h != tail) {  5 int ws = h.waitStatus;  6 if (ws == Node.SIGNAL) {//若是頭結點狀態是signal,即須要喚醒後繼節點  7 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//CAS一下若是當前狀態是signal則重置爲0,不然退出當前循環進入下次循環  8 continue;  9 unparkSuccessor(h);//喚醒後繼 10  } 11 else if (ws == 0 && 12 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//若是頭結點狀態是0且CAS成功狀態重置爲傳播失敗了,退出當前循環進入下次循環 13 continue; 14  } 15 if (h == head)// head發生變化 16 break; 17  } 18 }
複製代碼

 

3.5 小結

  本節咱們詳解了獨佔和共享兩種模式下獲取-釋放資源(acquire-release、acquireShared-releaseShared)的源碼,相信你們都有必定認識了。值得注意的是,acquire()和acquireSahred()兩種方法下,線程在等待隊列中都是忽略中斷的。AQS也支持響應中斷的,acquireInterruptibly()/acquireSharedInterruptibly()便是,這裏相應的源碼跟acquire()和acquireSahred()差很少,這裏就再也不詳解了。

 

 4、簡單應用

下面咱們就以AQS源碼裏的Mutex爲例,講一下AQS的簡單應用。

同步類本身(Mutex)則實現某個接口,對外服務。同步類在實現時通常都將自定義同步器(sync)定義爲內部類,只用實現state的獲取-釋放方式tryAcquire-tryRelelase,至於線程的排隊、等待、喚醒等,上層的AQS都已經實現好了,咱們不用關心。

 1 class Mutex implements Lock, java.io.Serializable {
 2     // 自定義同步器
 3     private static class Sync extends AbstractQueuedSynchronizer {
 4         // 判斷是否鎖定狀態
 5         protected boolean isHeldExclusively() {
 6             return getState() == 1;
 7         }
 8 
 9         // 嘗試獲取資源,當即返回。成功則返回true,不然false。
10         public boolean tryAcquire(int acquires) {
11             assert acquires == 1; // 這裏限定只能爲1個量
12             if (compareAndSetState(0, 1)) {//state爲0才設置爲1,不可重入!
13                 setExclusiveOwnerThread(Thread.currentThread());//設置爲當前線程獨佔資源
14                 return true;
15             }
16             return false;
17         }
18 
19         // 嘗試釋放資源,當即返回。成功則爲true,不然false。
20         protected boolean tryRelease(int releases) {
21             assert releases == 1; // 限定爲1個量
22             if (getState() == 0)//既然來釋放,那確定就是已佔有狀態了。只是爲了保險,多層判斷!
23                 throw new IllegalMonitorStateException();
24             setExclusiveOwnerThread(null);
25             setState(0);//釋放資源,放棄佔有狀態
26             return true;
27         }
28     }
29 
30     // 真正同步類的實現都依賴繼承於AQS的自定義同步器!
31     private final Sync sync = new Sync();
32 
33     //lock<-->acquire。二者語義同樣:獲取資源,即使等待,直到成功才返回。
34     public void lock() {
35         sync.acquire(1);
36     }
37 
38     //tryLock<-->tryAcquire。二者語義同樣:嘗試獲取資源,要求當即返回。成功則爲true,失敗則爲false。
39     public boolean tryLock() {
40         return sync.tryAcquire(1);
41     }
42 
43     //unlock<-->release。二者語文同樣:釋放資源。
44     public void unlock() {
45         sync.release(1);
46     }
47 
48     //鎖是否佔有狀態
49     public boolean isLocked() {
50         return sync.isHeldExclusively();
51     }
52 }

 5、總結

 

公共方法

子類須要自定義的方法(AQS中默認返回異常,子類覆蓋實現)

子類可直接使用的方法

獨佔模式

CAS操做節點、state同步狀態

compareAndSetState 設置同步狀態

compareAndSetHead 設置head節點
compareAndSetTail    設置tail節點

compareAndSetWaitStatus設置等待狀態
compareAndSetNext 設置下一個節點

 




protected boolean tryAcquire(int arg)獲取資源
protected boolean tryRelease(int arg) 釋放資源
protected boolean isHeldExclusively()該線程是否正在獨佔資源。

AbstractOwnableSynchronizer是AQS的父類,繼承AQS類天然繼承了AbstractOwnableSynchronizer,

方法:
protected final void setExclusiveOwnerThread(Thread thread)設置當前獨佔線程

protected final Thread getExclusiveOwnerThread()獲取當前獨佔線程

共享模式

protected int tryAcquireShared(int arg)獲取資源

protected boolean tryReleaseShared(int arg) 釋放資源

 

 

 瞭解了老李頭的AQS,再去看JUC下的類就簡單明瞭啦,以下:

1.獨佔模式

ReentrantLock:可重入鎖。state=0獨佔鎖,或者同一線程可屢次獲取鎖(獲取+1,釋放-1)。
Worker(java.util.concurrent.ThreadPoolExecutor類中的內部類)線程池類。shutdown關閉空閒工做線程,中斷worker工做線程是獨佔的,互斥的。

2.共享模式
Semaphore:信號量。 控制同時有多少個線程能夠進入代碼段。(互斥鎖的拓展)
CountDownLatch:倒計時器。  初始化一個值,多線程減小這個值,直到爲0,倒計時完畢,執行後續代碼。

3.獨佔+共享模式
ReentrantReadWriteLock:可重入讀寫鎖。獨佔寫+共享讀,即併發讀,互斥寫。

 

========參考=================

1.《The java.util.concurrent Synchronizer Framework》

2.http://singleant.iteye.com/blog/1418580

3.http://www.cnblogs.com/waterystone/p/4920797.html

相關文章
相關標籤/搜索