抽象隊列同步器(AbstractQueuedSynchronizer,簡稱AQS)是用來構建鎖或者其餘同步組件的基礎框架,它使用一個整型的volatile變量(命名爲state)來維護同步狀態,經過內置的FIFO隊列來完成資源獲取線程的排隊工做。html
volatile變量的讀寫和CAS是concurrent包得以實現的基礎。CAS表示若是當前狀態值等於預期值,則以原子方式將同步狀態設置爲給定的更新值,此操做具備volatile讀和寫的內存語義。AQS經過volatile的讀/寫和CAS所具備的volatile讀和寫的內存語義來實現線程之間的通訊。java
高層類 | Lock 同步器 阻塞隊列 Executor 併發容器 |
基礎類 | AQS 非阻塞數據結構 原子變量類 |
volatile變量的讀/寫 CAS |
concurrent包的實現結構如上圖所示,AQS、非阻塞數據結構和原子變量類等基礎類都是基於volatile變量的讀/寫和CAS實現,而像Lock、同步器、阻塞隊列、Executor和併發容器等高層類又是基於基礎類實現。node
1 private transient volatile Node head; //同步隊列的head節點 2 private transient volatile Node tail; //同步隊列的tail節點 3 private volatile int state; //同步狀態
AQS提供的能夠修改同步狀態的3個方法:編程
1 protected final int getState(); //獲取同步狀態 2 protected final void setState(int newState); //設置同步狀態 3 protected final boolean compareAndSetState(int expect, int update); //CAS設置同步狀態
AQS提供的模板方法,主要有如下三類:安全
1)獨佔式獲取和釋放同步狀態數據結構
1 public final void acquire(int arg) //獨佔式獲取同步狀態,若是不成功會進入同步隊列等待。 2 public final void acquireInterruptibly(int arg) //與acquire不一樣的是,能響應中斷 3 public final boolean tryAcquireNanos(int arg, long nanosTimeout) //增長超時機制 4 5 public final boolean release(int arg) //獨佔式釋放同步狀態,該方法會調用重寫的tryRelease(int arg)。
以上三種獲取同步狀態的方法都會調用自定義的tryAcquire(int arg)方法,acquire的獲取失敗時的入隊等待機制、acquireInterruptibly的響應中斷機制、tryAcquireNanos的超時機制等AQS已經實現好,這樣開發人員只須要實現本身的獲取同步狀態機制,就能夠大大下降了實現一個可靠自定義同步組件的門檻。併發
2)共享式獲取和釋放同步狀態app
1 public final void acquireShared(int arg) //共享式獲取同步狀態,若是不成功會進入同步隊列等待。與獨佔式不一樣的是,同一時刻能夠有多個線程獲取到同步狀態。 2 public final void acquireSharedInterruptibly(int arg) //可響應中斷 3 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) //超時機制 4 5 public final boolean releaseShared(int arg) //共享式釋放同步狀態,該方法會調用重寫的tryReleaseShared(int arg)。
一樣以上三種獲取同步狀態的方法會調用自定義的tryAcquireShared方法。框架
3)查詢同步隊列中的等待線程情ide
1 publicfinalCollection<Thread>getQueuedThreads() 2 publicfinalbooleanhasQueuedThreads()//返回包含可能正在等待獲取的線程列表,須要遍歷鏈表。返回的只是個估計值,且是無序的。這個方法的主要是爲子類提供的監視同步隊列措施而設計。 3 。。。
AQS提供的自定義方法
以上AQS的方法都爲final方法,不能被子類重寫,由於它們對於任何自定義同步器應該是不須要更改的,下面爲AQS提供的能夠重寫的方法。開發者須要根據自定義同步組件的特色,重寫如下方法。這些方法的實如今內部必須是線程安全的,一般應該很短而且不被阻塞。
1 protected boolean tryAcquire(int arg) //獨佔式獲取同步狀態,此方法應該查詢是否容許它在獨佔模式下獲取對象狀態,若是容許,則獲取它。返回值語義:true表明獲取成功,false表明獲取失敗。 2 protected boolean tryRelease(int arg) //獨佔式釋放同步狀態 3 4 protected int tryAcquireShared(int arg) //共享式獲取同步狀態,返回值語義:負數表明獲取失敗、0表明獲取成功但沒有剩餘資源、正數表明獲取成功,還有剩餘資源。 5 protected boolean tryReleaseShared(int arg) //共享式釋放同步狀態 6 7 protected boolean isHeldExclusively() //AQS是否被當前線程所獨佔
怎麼使用AQS實現自定義同步組件?
Mutex同步組件同一時刻只容許一個線程佔用鎖,不支持可重入。0表示未鎖定狀態,1表示鎖定狀態。
1 class Mutex implements Lock, java.io.Serializable { 2 3 //靜態內部類,自定義同步器 4 private static class Sync extends AbstractQueuedSynchronizer { 5 6 // 釋放處於佔用狀態(重寫isHeldExclusively)Report whether in locked state 7 protected boolean isHeldExclusively() { 8 return getState() == 1; 9 } 10 11 // 獨佔式獲取鎖(重寫tryAcquire) Acquire the lock if state is zero 12 public boolean tryAcquire(int acquires) { 13 assert acquires == 1; // Otherwise unused 14 if (compareAndSetState(0, 1)) { //CAS設置狀態爲1。 15 setExclusiveOwnerThread(Thread.currentThread()); 16 return true; 17 } 18 return false; 19 } 20 21 // 獨佔式釋放鎖(重寫tryRelease) Release the lock by setting state to zero 22 protected boolean tryRelease(int releases) { 23 assert releases == 1; // Otherwise unused 24 if (getState() == 0) //獲取狀態 25 throw new IllegalMonitorStateException(); 26 setExclusiveOwnerThread(null); 27 setState(0); //設置狀態爲0 28 return true; 29 } 30 31 // Provide a Condition 32 //每一個Condition都包含一個隊列 33 Condition newCondition() { return new ConditionObject(); } 34 35 // Deserialize properly 36 private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { 37 s.defaultReadObject(); 38 setState(0); // reset to unlocked state 39 } 40 } 41 42 // The sync object does all the hard work. We just forward to it. 43 private final Sync sync = new Sync(); 44 45 //僅須要將操做代理到sync 46 public void lock() { sync.acquire(1); } //調用AQS的模板方法, 47 public boolean tryLock() { return sync.tryAcquire(1); } 48 public void unlock() { sync.release(1); } 49 public Condition newCondition() { return sync.newCondition(); } 50 public boolean isLocked() { return sync.isHeldExclusively(); } 51 public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } 52 public void lockInterruptibly() throws InterruptedException { 53 sync.acquireInterruptibly(1); 54 } 55 public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { 56 return sync.tryAcquireNanos(1, unit.toNanos(timeout)); 57 } 58 }
BooleanLatch可用在多個線程須要等待某個事件發生才能繼續執行的狀況中。初始狀態state=0, 此時全部線程獲取同步狀態方法tryAcquireShared返回-1,即獲取失敗,入等待隊列。直到有線程調用tryReleaseShared釋放同步狀態,被阻塞的狀態纔會進行執行。
1 class BooleanLatch { 2 3 private static class Sync extends AbstractQueuedSynchronizer { 4 boolean isSignalled() { return getState() != 0; } 5 6 protected int tryAcquireShared(int ignore) { 7 return isSignalled()? 1 : -1; 8 } 9 10 protected boolean tryReleaseShared(int ignore) { 11 setState(1); 12 return true; 13 } 14 } 15 16 private final Sync sync = new Sync(); 17 public boolean isSignalled() { return sync.isSignalled(); } 18 public void signal() { sync.releaseShared(1); } 19 public void await() throws InterruptedException { 20 sync.acquireSharedInterruptibly(1); 21 } 22 }
同步隊列中的節點Node用來保存獲取同步狀態失敗的線程引用、等待狀態、以及前驅和後繼節點。
AQS中兩個域:head節點和tail節點,組成一個FIFO的雙向隊列。
private transient volatile Node head;
private transient volatile Node tail;
Node源碼以下。
1 static final class Node { 2 /** Marker to indicate a node is waiting in shared mode */ 3 static final Node SHARED = new Node(); //共享方式 4 /** Marker to indicate a node is waiting in exclusive mode */ 5 static final Node EXCLUSIVE = null; //獨佔方式 6 7 /** waitStatus value to indicate thread has cancelled */ 8 static final int CANCELLED = 1; //waitStatus=1爲取消狀態 9 /** waitStatus value to indicate successor's thread needs unparking */ 10 static final int SIGNAL = -1; //後繼節點的線程處於等待狀態,須要被喚醒。 11 /** waitStatus value to indicate thread is waiting on condition */ 12 static final int CONDITION = -2; //當前線程在condition上等待 13 static final int PROPAGATE = -3; //表示下一次共享式同步狀態獲取將會無條件的被傳播下去。 14 15 volatile int waitStatus; //等待狀態,0-初始狀態 16 volatile Node prev; //前驅節點 17 volatile Node next; //後繼節點 18 volatile Thread thread; //獲取同步的線程 19 Node nextWaiter; 20 21 final boolean isShared() { 22 return nextWaiter == SHARED; 23 } 24 //返回前驅節點 25 final Node predecessor() throws NullPointerException { 26 Node p = prev; 27 if (p == null) 28 throw new NullPointerException(); 29 else 30 return p; 31 } 32 33 Node() { // Used to establish initial head or SHARED marker 34 } 35 36 Node(Thread thread, Node mode) { // Used by addWaiter 37 this.nextWaiter = mode; 38 this.thread = thread; 39 } 40 41 Node(Thread thread, int waitStatus) { // Used by Condition 42 this.waitStatus = waitStatus; 43 this.thread = thread; 44 } 45 }
1)調用自定義的tryAcquire方法,該方法要保證線程線程安全的獲取同步狀態(如Mutex中的tryAcquire使用CAS更新保證原子性),若是獲取成功則return。
2)若是獲取失敗,構造Node,並經過addWaiter方法將節點插入隊列尾部。Node.EXCLUSIVE表示節點以獨佔方式等待。
3)acquireQueued方法中該節點自旋方式嘗試獲取同步狀態。若是獲取不到同步狀態,則阻塞節點中的線程,被阻塞線程喚醒依靠前驅節點的出隊或阻塞線程被終端來實現。
1 //該方法對中斷不敏感,也就是因爲線程獲取同步狀態失敗後進入同步隊列中,後續對線程進行中斷操做時,線程不會從同步隊列中移出. 2 public final void acquire(int arg) { 3 if (!tryAcquire(arg) && 4 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 5 selfInterrupt(); 6 } 7 8 //將節點添加到隊尾 9 private Node addWaiter(Node mode) { 10 Node node = new Node(Thread.currentThread(), mode); 11 // Try the fast path of enq; backup to full enq on failure 12 //快速嘗試入隊,若是失敗則須要調用enq(node)方法入隊,這樣作有什麼好處?有必定機率減小一次方法調用 13 //compareAndSetTail保證Node入隊是線程安全的 14 Node pred = tail; 15 if (pred != null) { 16 node.prev = pred; 17 if (compareAndSetTail(pred, node)) { 18 pred.next = node; 19 return node; 20 } 21 } 22 enq(node); 23 return node; 24 } 25 26 //初始化或自旋CAS直到入隊成功 27 private Node enq(final Node node) { 28 for (;;) { 29 Node t = tail; 30 if (t == null) { // Must initialize 31 if (compareAndSetHead(new Node())) 32 tail = head; 33 } else { 34 node.prev = t; 35 if (compareAndSetTail(t, node)) { 36 t.next = node; 37 return t; 38 } 39 } 40 } 41 }
入同步隊列以後怎麼獲取同步狀態?阻塞機制是怎樣的?
1 //自旋方式嘗試獲取同步狀態 2 final boolean acquireQueued(final Node node, int arg) { 3 boolean failed = true; 4 try { 5 boolean interrupted = false; 6 for (;;) { 7 final Node p = node.predecessor(); //獲取當前節點的前驅節點 8 if (p == head && tryAcquire(arg)) { //若是前驅節點是head節點則嘗試獲取同步狀態 9 setHead(node); 10 p.next = null; // help GC 11 failed = false; 12 return interrupted; 13 } 14 if (shouldParkAfterFailedAcquire(p, node) && 15 parkAndCheckInterrupt()) //判斷當前線程是否應該被阻塞,若是是則阻塞直到被喚醒繼續循環,若是不是則再次嘗試獲取同步狀態。 16 interrupted = true; 17 } 18 } finally { 19 if (failed) 20 cancelAcquire(node); 21 } 22 } 23 24 //判斷當前線程是否應該被阻塞,若是線程應該被阻塞則返回true。檢查和更新獲取同步狀態失敗Node的前驅節點的waitStatus。 25 //其中pred爲node的前驅節點 26 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 27 int ws = pred.waitStatus; 28 if (ws == Node.SIGNAL) 29 /* 30 * This node has already set status asking a release 31 * to signal it, so it can safely park. 32 */ 33 //前驅節點已經設置爲SIGNAL狀態,在前驅節點的線程釋放同步狀態會喚醒當前Node的線程。 34 return true; 35 if (ws > 0) { 36 /* 37 * Predecessor was cancelled. Skip over predecessors and 38 * indicate retry. 39 */ 40 //前驅節點是cancelled狀態,跳過被取消的Node,直到向前找到waitStatus > 0的Node做爲當前節點的前驅,而後重試獲取同步狀態。 41 do { 42 node.prev = pred = pred.prev; 43 } while (pred.waitStatus > 0); 44 pred.next = node; 45 } else { 46 /* waitStatus = 0是初始狀態。 47 * waitStatus must be 0 or PROPAGATE. Indicate that we 48 * need a signal, but don't park yet. Caller will need to 49 * retry to make sure it cannot acquire before parking. 50 */ 51 //將前驅節點的等待狀態改成SIGNAL。 52 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 53 } 54 return false; 55 } 56 57 //阻塞線程 58 private final boolean parkAndCheckInterrupt() { 59 LockSupport.park(this); //使用LockSupport阻塞當前線程 60 return Thread.interrupted(); 61 }
在獲取同步狀態時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中並在隊列中進行自旋;移出隊列(或中止自旋)的條件是前驅節點爲頭節點且成功獲取了同步狀態。
在釋放同步狀態時,同步器調用tryRelease(int arg)方法釋放同步狀態,而後喚醒頭節點的後繼節點。
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) { 3 Node h = head; 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h); //喚醒後繼節點線程 6 return true; 7 } 8 return false; 9 } 10 11 //喚醒後繼節點 12 private void unparkSuccessor(Node node) { 13 /* 14 * If status is negative (i.e., possibly needing signal) try 15 * to clear in anticipation of signalling. It is OK if this 16 * fails or if status is changed by waiting thread. 17 */ 18 //將當前節點的waitStatus改成0-原始狀態,目的是什麼? 19 int ws = node.waitStatus; 20 if (ws < 0) 21 compareAndSetWaitStatus(node, ws, 0); 22 23 /* 24 * Thread to unpark is held in successor, which is normally 25 * just the next node. But if cancelled or apparently null, 26 * traverse backwards from tail to find the actual 27 * non-cancelled successor. 28 */ 29 //若是後繼節點爲null或被取消,則從tail向前找到正常的後繼節點 30 Node s = node.next; 31 if (s == null || s.waitStatus > 0) { 32 s = null; 33 for (Node t = tail; t != null && t != node; t = t.prev) 34 if (t.waitStatus <= 0) 35 s = t; 36 } 37 if (s != null) 38 LockSupport.unpark(s.thread); //喚醒後繼節點 39 }
1)首先調用自定義方法tryAcquireShared嘗試獲取同步狀態,至少調用一次tryAcquireShared方法,若是返回值>=0,則獲取成功,return;不然執行步驟2),
2)當獲取失敗時,爲當前線程以共享方式建立Node並插入同步隊列。
3)入隊後,以自旋方式嘗試獲取同步狀態,若是前驅節點爲head節點,則嘗試獲取同步狀態,獲取失敗,則阻塞線程。
1 //共享式獲取同步狀態,忽略異常。 2 //注意:實現自定義方法tryAcquireShared時,要遵循AQS定義的返回值語義,負數表明獲取失敗、0表明獲取成功但沒有剩餘資源、正數表明獲取成功,還有剩餘資源。 3 public final void acquireShared(int arg) { 4 if (tryAcquireShared(arg) < 0) // 5 doAcquireShared(arg); 6 } 7 8 private void doAcquireShared(int arg) { 9 final Node node = addWaiter(Node.SHARED); //爲當前線程以共享方式建立Node並插入同步隊列尾部。 10 boolean failed = true; 11 try { 12 boolean interrupted = false; 13 for (;;) { 14 final Node p = node.predecessor(); 15 if (p == head) { //若是前驅節點爲head節點,則嘗試獲取同步狀態 16 int r = tryAcquireShared(arg); 17 if (r >= 0) { //獲取成功 18 setHeadAndPropagate(node, r); //設置node爲head節點,還有剩餘資源則繼續喚醒後繼的node 19 p.next = null; // help GC 20 if (interrupted) //若是等待過程當中被中斷過,則中斷當前線程 21 selfInterrupt(); //Thread.currentThread().interrupt(); 22 failed = false; 23 return; 24 } 25 } 26 //shouldParkAfterFailedAcquire方法判斷當前線程是否應該被阻塞,若是是則調用parkAndCheckInterrupt阻塞當前線程 27 if (shouldParkAfterFailedAcquire(p, node) && 28 parkAndCheckInterrupt()) 29 interrupted = true; 30 } 31 } finally { 32 if (failed) 33 cancelAcquire(node); 34 } 35 } 36 37 //獲取成功後,設置node爲head節點, 38 private void setHeadAndPropagate(Node node, int propagate) { 39 Node h = head; // Record old head for check below 40 setHead(node); //設置node爲head節點,所以node出隊 41 //若是還有剩餘資源,嘗試喚醒node節點的後繼節點 42 /* 43 * Try to signal next queued node if: 44 * Propagation was indicated by caller, 45 * or was recorded (as h.waitStatus) by a previous operation 46 * (note: this uses sign-check of waitStatus because 47 * PROPAGATE status may transition to SIGNAL.) 48 * and 49 * The next node is waiting in shared mode, 50 * or we don't know, because it appears null 51 * 52 * The conservatism in both of these checks may cause 53 * unnecessary wake-ups, but only when there are multiple 54 * racing acquires/releases, so most need signals now or soon 55 * anyway. 56 */ 57 if (propagate > 0 || h == null || h.waitStatus < 0) { 58 Node s = node.next; 59 if (s == null || s.isShared()) 60 doReleaseShared(); 61 } 62 } 63 64 //設置node爲head節點,所以node出隊 65 private void setHead(Node node) { 66 head = node; 67 node.thread = null; 68 node.prev = null; 69 }
1 public final boolean releaseShared(int arg) { 2 if (tryReleaseShared(arg)) { 3 doReleaseShared(); 4 return true; 5 } 6 return false; 7 } 8 9 //喚醒後繼節點線程並確保被傳播 10 private void doReleaseShared() { 11 /* 12 * Ensure that a release propagates, even if there are other 13 * in-progress acquires/releases. This proceeds in the usual 14 * way of trying to unparkSuccessor of head if it needs 15 * signal. But if it does not, status is set to PROPAGATE to 16 * ensure that upon release, propagation continues. 17 * Additionally, we must loop in case a new node is added 18 * while we are doing this. Also, unlike other uses of 19 * unparkSuccessor, we need to know if CAS to reset status 20 * fails, if so rechecking. 21 */ 22 for (;;) { 23 Node h = head; 24 if (h != null && h != tail) { 25 int ws = h.waitStatus; 26 if (ws == Node.SIGNAL) { 27 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 28 continue; // loop to recheck cases 29 unparkSuccessor(h); //喚醒後繼節點 30 } 31 else if (ws == 0 && 32 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 33 continue; // loop on failed CAS 34 } 35 if (h == head) // loop if head changed 36 break; 37 } 38 }
可響應中斷獲取與普通獲取的區別:當線程被中斷時,會當即返回,並拋出InterruptedException,執行cancelAcquire方法取消獲取同步狀態;而普通獲取只是將中斷標誌位置爲true,但線程依舊會阻塞在等待隊列中。
1 public final void acquireInterruptibly(int arg) 2 throws InterruptedException { 3 if (Thread.interrupted()) 4 throw new InterruptedException(); //拋出中斷異常 5 if (!tryAcquire(arg)) 6 doAcquireInterruptibly(arg); 7 } 8 9 private void doAcquireInterruptibly(int arg) 10 throws InterruptedException { 11 final Node node = addWaiter(Node.EXCLUSIVE); 12 boolean failed = true; 13 try { 14 for (;;) { 15 final Node p = node.predecessor(); 16 if (p == head && tryAcquire(arg)) { 17 setHead(node); 18 p.next = null; // help GC 19 failed = false; 20 return; 21 } 22 if (shouldParkAfterFailedAcquire(p, node) && 23 parkAndCheckInterrupt()) 24 throw new InterruptedException(); //惟一的區別,拋出中斷異常。而普通的獲取操做,只是將中斷標誌位置爲true。 25 } 26 } finally { 27 if (failed) 28 cancelAcquire(node); 29 } 30 }
如何取消線程?
1 private void cancelAcquire(Node node) { 2 // Ignore if node doesn't exist 3 if (node == null) 4 return; 5 6 node.thread = null; //將線程置爲null 7 8 // Skip cancelled predecessors 9 Node pred = node.prev; 10 while (pred.waitStatus > 0) //即waitStatus=1,爲cancelled狀態。跳過狀態爲取消的前驅節點 11 node.prev = pred = pred.prev; 12 13 // predNext is the apparent node to unsplice. CASes below will 14 // fail if not, in which case, we lost race vs another cancel 15 // or signal, so no further action is necessary. 16 Node predNext = pred.next; 17 18 // Can use unconditional write instead of CAS here. 19 // After this atomic step, other Nodes can skip past us. 20 // Before, we are free of interference from other threads. 21 node.waitStatus = Node.CANCELLED; //當前節點置爲cancelled狀態 22 23 // If we are the tail, remove ourselves. 24 if (node == tail && compareAndSetTail(node, pred)) { //若是當前節點爲tail節點,刪除該節點 25 compareAndSetNext(pred, predNext, null); 26 } else { 27 // If successor needs signal, try to set pred's next-link 28 // so it will get one. Otherwise wake it up to propagate. 29 // 30 int ws; 31 if (pred != head && 32 ((ws = pred.waitStatus) == Node.SIGNAL || 33 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 34 pred.thread != null) { 35 //若是前驅節點waitStatus爲SIGNAL或者CAS更新爲SIGNAL成功,則pred釋放同步狀態時會通知後繼節點 36 //而且pred.thread不爲null,則cas將pred的後繼節點置爲node.next, 37 Node next = node.next; 38 if (next != null && next.waitStatus <= 0) 39 compareAndSetNext(pred, predNext, next); 40 } else { 41 unparkSuccessor(node); //喚醒後繼節點 42 } 43 44 node.next = node; // help GC next指向本身,node從等待隊列中移除。 45 } 46 }
在響應中斷的基礎上增長了超時機制
1 public final boolean tryAcquireNanos(int arg, long nanosTimeout) 2 throws InterruptedException { 3 if (Thread.interrupted()) 4 throw new InterruptedException(); 5 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); 6 } 7 8 private boolean doAcquireNanos(int arg, long nanosTimeout) 9 throws InterruptedException { 10 long lastTime = System.nanoTime(); 11 final Node node = addWaiter(Node.EXCLUSIVE); 12 boolean failed = true; 13 try { 14 for (;;) { 15 final Node p = node.predecessor(); 16 if (p == head && tryAcquire(arg)) { 17 setHead(node); 18 p.next = null; // help GC 19 failed = false; 20 return true; 21 } 22 if (nanosTimeout <= 0) //判斷是否超時,nanosTimeout<=0表示超時,若是超時則return false. 23 return false; 24 if (shouldParkAfterFailedAcquire(p, node) && 25 nanosTimeout > spinForTimeoutThreshold) 26 LockSupport.parkNanos(this, nanosTimeout); //若是還剩餘的時間nanosTimeout>閾值(spinForTimeoutThreshold=1000納秒),則阻塞當前線程nanosTimeout納秒。當nanosTimeout<1000納秒時,則不阻塞當前線程,而是進入快速的自旋過程。緣由在於,很是短的超時等待沒法作到十分精確,若是這時再進行超時等待,相反會讓nanosTimeout的超時從總體上表現得反而不精確。所以,在超時很是短的場景下,同步器會進入無條件的快速自旋。 27 long now = System.nanoTime(); //當前時間 28 //lastTime表示上次喚醒時間,now - lastTime表示已經睡眠的時間 29 nanosTimeout -= now - lastTime; //nanosTimeout表示還剩餘的時間,nanosTimeout>0表示表示超時時間未到 30 lastTime = now; 31 if (Thread.interrupted()) 32 throw new InterruptedException(); //拋異常 33 } 34 } finally { 35 if (failed) 36 cancelAcquire(node); 37 } 38 }
總結,三種方式獲取同步狀態方式的對比,主要區別在於獲取同步狀態失敗時的處理邏輯:
acquire方法直接阻塞線程,不響應中斷,只是將中斷標記置爲true,但線程依舊會阻塞在等待隊列中。
acquireInterruptibly方法直接阻塞線程,響應中斷,當線程被中斷時,會當即返回,並拋出InterruptedException。
tryAcquireNanos方法將線程阻塞nanosTimeout秒,如何超時還未獲取到同步狀態,則返回。同時支持響應中斷。
1 public final void acquire(int arg) //獨佔式獲取同步狀態,若是不成功會進入同步隊列等待。 2 public final void acquireInterruptibly(int arg) //與acquire不一樣的是,能響應中斷 3 public final boolean tryAcquireNanos(int arg, long nanosTimeout) //增長超時機制
在阻塞和喚醒線程時,使用了LockSupport類。LockSupport提供了一系列阻塞和喚醒線程的公共方法。底層使用unsafe提供的方法實現。
1 void park() //阻塞當前線程,只有調用unpark或中斷才能從park方法中返回 2 void parkNanos(long nanos) //超時阻塞當前線程,超時則返回 3 void parkUntil(long deadline) //截止時間阻塞當前線程,直到deadline 4 5 void unpark(Thread thread) //喚醒處於阻塞狀態的線程 6 7 //jdk1.6中增長了如下方法,blocker表示當前線程要等待的對象。線程dump信息比使用park方法要多,方便問題排查和監控。 8 void park(Object blocker) 9 void parkNanos(Object blocker, long nanos) 10 void parkUntil(Object blocker, long deadline)
《Java併發編程的藝術》
《Java併發之AQS詳解》http://www.cnblogs.com/waterystone/p/4920797.html