目錄java
AQS是AbstractQueuedSynchronizer(抽象隊列同步器)的縮寫。它是多線程訪問共享資源的框架,ReentrantLock、CountDownLatch、Semaphore等都是基於它來實現的。node
從圖中能夠看到,有兩個關鍵的組成部分,一個是state(共享資源,也能夠理解爲資源佔用計數器),另外一個是FIFO隊列,用來保存須要得到共享資源的縣城,其head節點始終指向當前真在佔用共享資源的線程。多線程
進入等待隊列的線程會被封裝成一個Node。其主要成員以下:框架
class Node { //在同步隊列中等待的線程等待超時或被中斷,須要從同步隊列中取消該Node的結點,其結點的waitStatus爲CANCELLED,即結束狀態,進入該狀態後的結點將不會再變化。 static final int CANCELLED = 1; //值爲-1,被標識爲該等待喚醒狀態的後繼結點,當其前繼結點的線程釋放了同步鎖或被取消,將會通知該後繼結點的線程執行。說白了,就是處於喚醒狀態,只要前繼結點釋放鎖,就會通知標識爲SIGNAL狀態的後繼結點的線程執行。 static final int SIGNAL = -1; //與Condition相關,該標識的結點處於等待隊列中,結點的線程等待在Condition上,當其餘線程調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。 static final int CONDITION = -2; //與共享模式相關,在共享模式中,該狀態標識結點的線程處於可運行狀態。 static final int PROPAGATE = -3; //節點的等待狀態,默認0狀態:值爲0,表明初始化狀態。 volatile int waitStatus; //前驅結點 volatile Node prev; //後驅節點 volatile Node next; //目標線程 volatile Thread thread; //獲取前驅結點 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } }
對於共享資源state的修改,除了提供普通的getter以外,還提供了一個原子操做compareAndSetState()。函數
AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。ui
AQS提供了幾個重要的方法,參數都是state的值:this
自定義同步器主要實現這些方法便可,其餘的工做AQS自己已經實現好了。線程
以ReentrantLock爲例,state初始化爲0,表示資源/鎖未被佔用。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是==可重入==的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。code
今天主要分析獨佔式下的acquire-release。blog
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt(); 5 }
函數步驟以下:
下面逐個方法看。
1 protected boolean tryAcquire(int arg) { 2 throw new UnsupportedOperationException(); 3 }
該方法的實現僅僅是拋出一個異常。然而該方法正是須要自定義同步器重寫的方法,包括對state的操做。
因爲直接獲取資源失敗,該方法是將線程放到等待隊列尾部。
private Node addWaiter(Node mode) { //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享) Node node = new Node(Thread.currentThread(), mode); //嘗試快速方式直接放到隊尾。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //上一步失敗則經過enq入隊。 enq(node); return node; }
其中compareAndSetTail(pred, node)是以原子的方式進行尾節點和當前線程節點的交換。
該方法是在快速加入尾節點失敗以後執行,目的也是將node加入隊尾。
1 private Node enq(final Node node) { 2 //"自旋",直到成功加入隊尾 3 for (;;) { 4 Node t = tail; 5 if (t == null) { // 隊列爲空,建立一個空的標誌結點做爲head結點,並將tail也指向它。 6 if (compareAndSetHead(new Node()))//原子設置頭節點 7 tail = head; 8 } else {//正常流程,放入隊尾 9 node.prev = t; 10 if (compareAndSetTail(t, node)) { 11 t.next = node; 12 return t; 13 } 14 } 15 } 16 }
經過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部「休息」,直到其餘線程完全釋放資源後喚醒本身,再去獲取共享資源。
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true;//標記是否成功拿到資源 3 try { 4 boolean interrupted = false;//標記等待過程當中是否被中斷過 5 6 //又是一個「自旋」! 7 for (;;) { 8 final Node p = node.predecessor();//拿到前驅 9 //若是前驅是head,則能夠去獲取資源 10 if (p == head && tryAcquire(arg)) { 11 setHead(node);//拿到資源後,將head指向該結點。因此head所指的標杆結點,就是當前獲取到資源的那個結點或null。 12 p.next = null; // setHead中node.prev已置爲null,此處再將原來的head.next置爲null,就是爲了方便GC回收之前的head結點。也就意味着以前拿完資源的結點出隊了! 13 failed = false; 14 return interrupted;//返回等待過程當中是否被中斷過 15 } 16 17 //判斷是否能夠休息,若是能夠,就進入waiting狀態,若是等待過程當中被中斷過,就將interrupted標記爲true 18 if (shouldParkAfterFailedAcquire(p, node) && 19 parkAndCheckInterrupt()) 20 interrupted = true; 21 } 22 } finally {//自旋過程當中超時或者被中斷則從隊列移除該節點 23 if (failed) 24 cancelAcquire(node); 25 } 26 }
此方法主要用於檢查狀態,看看本身是否能夠進入waiting狀態。
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 int ws = pred.waitStatus;//拿到前驅的等待狀態 3 if (ws == Node.SIGNAL) 4 //若是已經告訴前驅資源釋放後通知本身一下,那就能夠安心休息了 5 return true; 6 if (ws > 0) { 7 /* 8 * 若是前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後邊。 9 * 注意:那些放棄的結點稍後就會被回收 10 */ 11 do { 12 node.prev = pred = pred.prev; 13 } while (pred.waitStatus > 0); 14 pred.next = node; 15 } else { 16 //若是前驅正常,那就把前驅的狀態設置成SIGNAL,告訴它資源釋放後通知本身一下。 17 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 18 } 19 return false; 20 }
休眠而且檢查中斷。
1 private final boolean parkAndCheckInterrupt() { 2 LockSupport.park(this);//調用本地方法park()使線程進入waiting狀態 3 return Thread.interrupted();//返回當前線程是否被中斷。 4 }
從隊列移除節點
private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;//跳過已經被取消的節點一直往前找,直到找到一個有效的節點,讓node的前驅結點指向該節點 Node predNext = pred.next;//獲取剛纔找到的前驅結點的後置節點 node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) {//若是當前node就是尾節點,就以原子方式把剛纔找到的前驅結點設置爲新的尾節點 compareAndSetNext(pred, predNext, null);//以原子的方式將上面設置爲新的尾節點的後置節點置爲null } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {//前驅結點不是頭節點並且成功設置了"信號狀態"的以後,就把它的後置節點指向即將要取消的node節點的後置節點 Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node);//不然喚醒下一個須要獲取鎖的節點 } node.next = node; } }
當前節點是尾節點:
當前節點既不是尾節點也不是頭節點:
釋放資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) { 3 Node h = head;//找到頭結點,即當前持有資源的線程對應的節點 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h);//喚醒等待隊列裏的下一個線程 6 return true; 7 } 8 return false; 9 }
此方法嘗試去釋放指定量的資源。須要自定義同步器本身實現。
1 protected boolean tryRelease(int arg) { 2 throw new UnsupportedOperationException(); 3 }
其實就是將state減去arg,若是已經完全釋放資源(state=0),要返回true,不然返回false。
此方法用於喚醒等待隊列中下一個線程。
1 private void unparkSuccessor(Node node) { 3 int ws = node.waitStatus; 4 if (ws < 0)//置0當前線程所在的結點狀態。 5 compareAndSetWaitStatus(node, ws, 0); 6 7 Node s = node.next;//找到下一個須要喚醒的結點s 8 if (s == null || s.waitStatus > 0) {//若是爲空或已取消 9 s = null; 10 for (Node t = tail; t != null && t != node; t = t.prev)//從隊列尾向前找,直到找到下一個距離node最近的有效節點 11 if (t.waitStatus <= 0)//從這裏能夠看出,<=0的結點,都是還有效的結點。 12 s = t; 13 } 14 if (s != null) 15 LockSupport.unpark(s.thread);//喚醒 16 }