java5以後的java.util.concurrent包是世界級併發大師Doug Lea的做品,裏面主要實現了java
今天咱們主要介紹AbstractQueuedSynchronizer這個能夠說是最核心的類,沒有之一。整個concurrent包裏,基本都直接或間接地用到了這個類。Doug Lea的這篇論文裏面講AQS的實現。node
首先,咱們來想象一下,一間屋裏有一個你們都想要獲得的會讓你很爽的東西(something which makes you so happy, e.g. W.C)。當有人進去把門關起來在獨佔享用的時候,其餘人就只能在外面排隊等待,既然在等待,你就不能總是去敲門說哎,好了沒有啊。總是這樣的話裏面的人就很不爽了,並且你能夠利用這點等待時間乾點別的,好比看看小說視頻背背單詞或者就乾脆椅子上睡覺,當前面獨佔的人爽完以後,就會出來講,啊,好爽,到大家了。而後你們可能按照排隊順序獲取或者你們瘋搶這個狀態,有可能一我的本身進去獨佔,有可能幾我的說,哎不要緊,咱們能夠一塊兒來。而後他們進去爽,爽完以後再出來通知下一個。併發
AQS是一個abstract class,能夠經過繼承AQS,定義state的含義,以及tryAcquire,tryRelease,以及對應的share模式下tryAcquireShared,tryReleaseShared這幾個方法,定義出本身想要的同步子(Synchronizers)。通常而言,是定義一個內部類Sync extends AQS,實現前面說的幾個方法,而後再包一層,暴露出相應的方法。這樣作的好處是你能夠在包裝器類裏面取更直觀的名字,如ReentrantLock裏的lock,unlock和CountDownLatch裏的countDown,await,而不是太通用的acquire和release等。並且AQS裏面一些方法是爲了監控和調試使用,直接暴露出來也很差。函數
class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } }
/** Synchronizer providing all implementation mechanics */ private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {
定義了一個內部類,基本任務都代理給sync完成。而Sync又是一個abstract class,這裏主要是由於實現了兩種搶佔鎖的機制,公平鎖和非公平鎖。
static final class FairSync extends Sync static final class NonfairSync extends Sync
public ReentrantLock() { sync = new NonfairSync();//默認非公平鎖,AQS論文說非公平鎖效率高些,理由其實很簡單,公平鎖通知隊列第一個節點,要把它喚醒,而喚醒是須要時間的,在鎖釋放到第一個節點被喚醒這段時間其實鎖是能夠用可是沒有被用的(available but not used);而非公平鎖,釋放了以後立馬就能夠被別人用,因此提升了效率,可是有可能致使飢餓鎖,這個就要具體看業務需求了。 } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();//指定公平與否 }
public void lock() { sync.lock(); }
final void lock() { acquire(1); }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//0表示鎖沒有被線程用,1表示已經有線程佔用 if (!hasQueuedPredecessors() && //判斷本身是不是第一個節點,實現公平 compareAndSetState(0, acquires)) {//CAS更新狀態 setExclusiveOwnerThread(current);//設置當前線程擁有狀態 return true; } } else if (current == getExclusiveOwnerThread()) {//1表示已經有線程佔用,再判斷一下是否被當前線程佔用,來實現重入(Reentrant)特性 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc);//更新狀態 return true; } return false; }
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//用當前線程構造Node,獨佔模式 // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) {//快速判斷,CAS更新tail節點 = node; return node; } } enq(node);//若是失敗,進入enq方法 return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node()))//若是尚未head,CAS初始化一個head tail = head; } else {//這段代碼跟addWaiter裏同樣,CAS更新tail節點 node.prev = t; if (compareAndSetTail(t, node)) { = node; return t; } } } }
如今咱們已經將獲取不到鎖的線程加入隊尾了,如今要將它掛起acquireQueued(addWaiter(Node.EXCLUSIVE), arg)):
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//在一個死循環中,避免假喚醒 final Node p = node.predecessor();//獲取當前節點的前一個節點,若是是head說明本身是第一個能夠獲取資源的線程,實現公平 if (p == head && tryAcquire(arg)) {//是第一個能夠獲取資源的線程而且嘗試獲取成功 setHead(node); = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//沒有獲取到資源,睡眠park去 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
public void unlock() { sync.release(1);//代理給sync,調用AQS的release } //下面代碼在AQS中 public final boolean release(int arg) { if (tryRelease(arg)) {//嘗試釋放資源,須要在子類裏實現 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//通知下一個節點 return true; } return false; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 主要在這裏找到下一個須要通知的節點,若是node.next就是須要通知的節點,則直接通知;不然,可能 == null(緣由是雙向鏈表設置b.pre = a和 = b的時候不能保證原子性,只能保證b.pre = a成功,這時候另外一條線程可能看到 == null)或者s.waitStatus > 0(緣由是線程等不及被取消了static final int CANCELLED = 1;),這個時候就要從隊尾tail開始找,找到離隊頭head最近的一個須要通知的節點Node。 Node s =; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//喚醒線程 }
protected final boolean tryRelease(int releases) { int c = getState() - releases;//釋放鎖,state減去相應的值 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException();//避免A線程鎖了以後,B線程故意搗亂釋放鎖 boolean free = false; if (c == 0) {//當前線程已經徹底釋放了鎖 free = true; setExclusiveOwnerThread(null);//釋放鎖的擁有者 } setState(c);//設置狀態,這個方面沒有同步,沒有CAS,有同窗問過豈不是有線程併發問題?其實到這裏,只有一個線程會調用這個方法,因此不會有併發錯誤,仔細想一想,是吧?是吧? return free; }
final void lock() { if (compareAndSetState(0, 1))//先搶一把(插隊),萬一成功了就不排隊,不公平性就體如今這裏! setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires);//調用父類方法nonfairTryAcquire } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) {//跟FairSync.tryAcquire只有這裏一行有差別,即少了!hasQueuedPredecessors(),也就是說不判斷前面有沒有人,任什麼時候候只要它醒來,都會去搶,因此不公平!============剛又看了一遍,發現其實final boolean acquireQueued(final Node node, int arg)方法裏已經有node.predecessor() == head的判斷,感受這個不公平的tryAcquire貌似沒有意義,各位看官怎麼看呢,請留言哈,謝謝~ setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
package concurrentStudy; import java.util.concurrent.CountDownLatch; /** * Created by magicalli on 2014/12/13. */ public class IndexPlusPlusTest01 { private static final int NThreads = 10;// 線程數 private static final int M = 100000;//循環次數,過小的話(好比10)可能看不出來效果 private volatile static int n = 0;//加volatile的目的是爲了證實volatile沒有「原子性」! public static void main(String[] args) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(NThreads); for (int i = 0; i < NThreads; i++) { new Thread(new Runnable() { @Override public void run() { try { startGate.await();//全部線程start以後等待「門「打開,保證同時真正開始運行 } catch (InterruptedException e) { e.printStackTrace(); } for (int j = 0; j < M; j++) { n += 1; } endGate.countDown(); } }).start(); } startGate.countDown();//打開「門」,讓全部線程同時run起來 long t1 = System.currentTimeMillis(); endGate.await();//等全部線程都結束以後纔打印n,不然老是會打出錯誤的n;我見過這裏用Thread.sleep(),可是問題在於,你怎麼知道該等多久才能保證全部線程結束以及恰好結束呢?! long t2 = System.currentTimeMillis(); System.out.println("cost time: " + (t2 - t1)); System.out.println("n: " + n); } }
private static final class Sync extends AbstractQueuedSynchronizer
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException();//響應中斷 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;//若是state爲0,說明全部Thread完成任務,能夠不阻塞了 }
若是沒有獲取到,將Thread加入隊尾,掛起。下面這個方法跟獨佔模式下acquireQueued(addWaiter(Node.EXCLUSIVE), arg))這個方法代碼是基本一致的。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//共享模式 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r);//跟EXCLUSIVE的一大區別 = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException();//響應中斷,這裏直接拋異常 } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ // 若是當前節點是願意共享,而且下一個節點也是願意共享的,那麼就進入doReleaseShared,喚醒下一個節點,下面會詳解 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s =; if (s == null || s.isShared()) doReleaseShared(); } }
public void countDown() { sync.releaseShared(1);//調用AQS的 } // AQS中 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//嘗試釋放,須要在子類中實現 doReleaseShared();//真正釋放 return true; } return false; } // Sync子類中實現 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc))// 在死循環中CAS將count-1 return nextc == 0; } } // AQS中 private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h);//遍歷queue,通知全部SHARED的節點,由於是共享模式,這些Node都應該被喚醒,直到遇到某個EXCLUSIVE的Node } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false;//若是已經沒時間了,直接return false if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)//大於某個閾值,才park,不然進入自旋 LockSupport.parkNanos(this, nanosTimeout);//調用帶超時的park方法 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
能夠看到,跟不帶超時的doAcquireSharedInterruptibly方法相比,區別主要在於每次for循環期間,檢查時間是否過時和調用帶超時的park。nanosTimeout > spinForTimeoutThreshold這個判斷主要是由於park/unpark自己也須要花時間,爲了更準確地完成超時的機制,在超時時間立刻就要到了的時候,就進入自旋,再也不park了,這應該是Doug Lea測試了park/unpark時間比1000納秒要長吧。
/** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices * to improve responsiveness with very short timeouts. */ static final long spinForTimeoutThreshold = 1000L;
