@java
JDK1.5之前只有synchronized同步鎖,而且效率很是低,所以大神Doug Lea本身寫了一套併發框架,這套框架的核心就在於AbstractQueuedSynchronizer類(即AQS),性能很是高,因此被引入JDK包中,即JUC。那麼AQS是怎麼實現的呢?本篇就是對AQS及其相關組件進行分析,瞭解其原理,並領略大神的優美而又精簡的代碼。node
AQS是JUC下最核心的類,沒有之一,因此咱們先來分析一下這個類的數據結構。
緩存
AQS內部是使用了雙向鏈表將等待線程連接起來,當發生併發競爭的時候,就會初始化該隊列並讓線程進入睡眠等待喚醒,同時每一個節點會根據是否爲共享鎖標記狀態爲共享模式或獨佔模式。這個數據結構須要好好理解並緊緊記住,下面分析的組件都將基於此實現。安全
Lock是一個接口,提供了加/解鎖的通用API,JUC主要提供了兩種鎖,ReentrantLock和ReentrantReadWriteLock,前者是重入鎖,實現Lock接口,後者是讀寫鎖,自己並無實現Lock接口,而是其內部類ReadLock或WriteLock實現了Lock接口。先來看看Lock都提供了哪些接口:性能優化
// 普通加鎖,不可打斷;未獲取到鎖進入AQS阻塞 void lock(); // 可打斷鎖 void lockInterruptibly() throws InterruptedException; // 嘗試加鎖,未獲取到鎖不阻塞,返回標識 boolean tryLock(); // 帶超時時間的嘗試加鎖 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 解鎖 void unlock(); // 建立一個條件隊列 Condition newCondition();
看到這裏讀者們能夠先思考下,本身如何來實現上面這些接口。數據結構
synchronized和ReentrantLock都是可重入的,後者使用更加靈活,也提供了更多的高級特性,但其本質的實現原理是差很少的(聽說synchronized是借鑑了ReentrantLock的實現原理)。ReentrantLock提供了兩個構造方法:多線程
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
有參構造是根據參數建立公平鎖或非公平鎖,而無參構造默認則是非公平鎖,由於非公平鎖性能很是高,而且大部分業務並不須要使用公平鎖。至於爲何非公平鎖性能很高,我們接着往下看。併發
非公平鎖和公平鎖在實現上基本一致,只有個別的地方不一樣,所以下面會採用對比分析方法進行分析。
從lock方法開始:app
public void lock() { sync.lock(); }
其實是委託給了內部類Sync,該類實現了AQS(其它組件實現方法也基本上都是這個套路);因爲有公平和非公平兩種模式,所以該類又實現了兩個子類:FairSync和NonfairSync:框架
// 非公平鎖 final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } // 公平鎖 final void lock() { acquire(1); }
這裏就是公平鎖和非公平鎖的第一個不一樣,非公平鎖首先會調用CAS將state從0改成1,若是能改爲功則表示獲取到鎖,直接將exclusiveOwnerThread設置爲當前線程,不用再進行後續操做;不然則同公平鎖同樣調用acquire方法獲取鎖,這個是在AQS中實現的模板方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這裏兩種鎖惟一不一樣的實現就是tryAcquire方法,先來看非公平鎖的實現:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
state=0表示尚未被線程持有鎖,直接經過CAS修改,能修改爲功的就獲取到鎖,修改失敗的線程先判斷exclusiveOwnerThread是否是當前線程,是則state+1,表示重入次數+1並返回true,加鎖成功,不然則返回false表示嘗試加鎖失敗並調用acquireQueued入隊。
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; // 首尾不相等且頭結點線程不是當前線程則表示須要進入隊列 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
上面就是公平鎖的嘗試獲取鎖的代碼,能夠看到基本和非公平鎖的代碼是同樣的,區別在於首次加鎖須要判斷是否已經有隊列存在,沒有才去加鎖,有則直接返回false。
接着來看addWaiter方法,當嘗試加鎖失敗時,首先就會調用該方法建立一個Node節點並添加到隊列中去。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 尾節點不爲null表示已經存在隊列,直接將當前線程做爲尾節點 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 尾結點不存在則表示尚未初始化隊列,須要初始化隊列 enq(node); return node; } private Node enq(final Node node) { // 自旋 for (;;) { Node t = tail; if (t == null) { // 只會有一個線程設置頭節點成功 if (compareAndSetHead(new Node())) tail = head; } else { // 其它設置頭節點失敗的都會自旋設置尾節點 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
這裏首先傳入了一個獨佔模式的空節點,並根據該節點和當前線程建立了一個Node,而後判斷是否已經存在隊列,若存在則直接入隊,不然調用enq方法初始化隊列,提升效率。
此處還有一個很是細節的地方,爲何設置尾節點時都要先將以前的尾節點設置爲node.pre的值呢,而不是在CAS以後再設置?好比像下面這樣:
if (compareAndSetTail(pred, node)) { node.prev = pred; pred.next = node; return node; }
由於若是這樣作的話,在CAS設置完tail後會存在一瞬間的tail.pre=null的狀況,而Doug Lea正是考慮到這種狀況,不論什麼時候獲取tail.pre都不會爲null。
接着看acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) { // 爲true表示存在須要取消加鎖的節點,僅從這段代碼能夠看出, // 除非發生異常,不然不會存在須要取消加鎖的節點。 boolean failed = true; try { // 打斷標記,由於調用的是lock方法,因此是不可打斷的 // (但其實是打斷了的,只不過這裏採用了一種**靜默**處理方式,稍後分析) boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
這裏就是隊列中線程加鎖/睡眠的核心邏輯,首先判斷剛剛調用addWaiter方法添加到隊列的節點是不是頭節點,若是是則再次嘗試加鎖,這個剛剛分析過了,非公平鎖在這裏就會再次搶一次鎖,搶鎖成功則設置爲head節點並返回打斷標記;不然則和公平鎖同樣調用shouldParkAfterFailedAcquire判斷是否應該調用park方法進入睡眠。
爲何在park前須要這麼一個判斷呢?由於當前節點的線程進入park後只能被前一個節點喚醒,那前一個節點怎麼知道有沒有後繼節點須要喚醒呢?所以當前節點在park前須要給前一個節點設置一個標識,即將waitStatus設置爲Node.SIGNAL(-1),而後自旋一次再走一遍剛剛的流程,若仍是沒有獲取到鎖,則調用parkAndCheckInterrupt進入睡眠狀態。
讀者可能會比較好奇Thread.interrupted這個方法是作什麼用的。
public static boolean interrupted() { return currentThread().isInterrupted(true); }
這個是用來判斷當前線程是否被打斷過,並清除打斷標記(如果被打斷過則會返回true,並將打斷標記設置爲false),因此調用lock方法時,經過interrupt也是會打斷睡眠的線程的,只是Doug Lea作了一個假象,讓用戶無感知;但有些場景又須要知道該線程是否被打斷過,因此acquireQueued最終會返回interrupted打斷標記,若是是被打斷過,則返回的true,並在acquire方法中調用selfInterrupt再次打斷當前線程(將打斷標記設置爲true)。
這裏咱們對比看看lockInterruptibly的實現:
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
能夠看到區別就在於使用lockInterruptibly加鎖被打斷後,是直接拋出InterruptedException異常,咱們能夠捕獲這個異常進行相應的處理。
最後來看看cancelAcquire是如何取消加鎖的,該狀況比較特殊,簡單瞭解下便可:
private void cancelAcquire(Node node) { if (node == null) return; // 首先將線程置空 node.thread = null; // waitStatus > 0表示節點處於取消狀態,則直接將當前節點的pre指向在此以前的最後一個有效節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 保存前一個節點的下一個節點,若是在此以前存在取消節點,這裏就是以前取消被取消節點的頭節點 Node predNext = pred.next; node.waitStatus = Node.CANCELLED; // 當前節點是tail節點,則替換尾節點,替換成功則將新的尾結點的下一個節點設置爲null; // 不然須要判斷是將當前節點的下一個節點賦值給最後一個有效節點,仍是喚醒下一個節點。 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // 併發狀況下,可能已經被其它線程喚醒或已經取消,則從後向前找到最後一個有效節點並喚醒 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
解鎖就比較簡單了,先調用tryRelease對state執行減一操做,若是state==0,則表示徹底釋放鎖;若果存在後繼節點,則調用unparkSuccessor喚醒後繼節點,喚醒後的節點的waitStatus會從新被設置爲0.
只是這裏有一個小細節,爲何是從後向前找呢?由於咱們在開始說過,設置尾節點保證了node.pre不會爲null,但pre.next仍有多是null,因此這裏只能從後向前找到最後一個有效節點。
上面是ReentrantLock的加鎖流程,能夠看到整個流程不算複雜,只是判斷和跳轉比較多,主要是Doug Lea將代碼和性能都優化到了極致,代碼很是精簡,但細節卻很是多。另外經過上面的分析,咱們也能夠發現,公平鎖和非公平鎖的區別就在於非公平鎖無論是否有線程在排隊,先搶三次鎖,而公平鎖則會判斷是否存在隊列,有線程在排隊則直接進入隊列排隊;另外線程在park被喚醒後非公平鎖還會搶鎖,公平鎖仍然須要排隊,因此非公平鎖的性能比公平鎖高不少,大部分狀況下咱們使用非公平鎖便可。
ReentrantLock是一把獨佔鎖,只支持重入,不支持共享,因此JUC包下還提供了讀寫鎖,這把鎖支持讀讀併發,但讀寫、寫寫都是互斥的。
讀寫鎖也是基於AQS實現的,也包含了一個繼承自AQS的內部類Sync,一樣也有公平和非公平兩種模式,下面主要討論非公平模式下的讀寫鎖實現。
讀寫鎖實現相對比較複雜,在ReentrantLock中就是使用的int型的state屬性來表示鎖被某個線程佔有和重入次數,而ReentrantReadWriteLock分爲了讀和寫兩種鎖,要怎麼用一個字段表示兩種鎖的狀態呢?Doug Lea大師將state字段分爲了高二字節和低二字節,即高16位用來表示讀鎖狀態,低16位則用來表示寫鎖,以下圖:
由於讀寫鎖狀態都只用了兩個字節,因此可重入的次數最可能是65535,固然正常狀況下重入是不可能達到這麼多的。
那它是怎麼實現的呢?仍是先從構造方法開始:
public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
一樣默認就是非公平鎖,同時還建立了readerLock和writerLock兩個對象,咱們只須要像下面這樣就能獲取到讀寫鎖:
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private static Lock r = lock.readLock(); private static Lock w = lock.writeLock();
因爲寫鎖的加鎖過程相對更簡單,下面先從寫鎖加鎖開始分析,入口在ReentrantReadWriteLock#WriteLock.lock()方法,點進去看,發現仍是使用的AQS中的acquire方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
因此不一樣的地方也只有tryAcquire方法,咱們重點分析這個方法就行:
static final int SHARED_SHIFT = 16; // 65535 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 低16位是1111....1111 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 獲得c低16位的值 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); // 獲取寫鎖加鎖和重入的次數 int w = exclusiveCount(c); if (c != 0) { // 已經有線程持有鎖 // 這裏有兩種狀況:1. c!=0 && w==0表示有線程獲取了讀鎖,不管是否是當前線程,直接返回false, // 也就是說讀-寫鎖是不支持升級重入的(但支持寫-讀降級),緣由後文會詳細分析; // 2. c!=0 && w!=0 && current != getExclusiveOwnerThread()表示有其它線程持有了寫鎖,寫寫互斥 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 超出65535,拋異常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 不然寫鎖的次數直接加1 setState(c + acquires); return true; } // c==0纔會走到這,但這時存在兩種狀況,有隊列和無隊列,因此公平鎖和非公平鎖處理不一樣, // 前者須要判斷是否存在隊列,有則嘗試加鎖失敗,無則加鎖成功,而非公平鎖直接使用CAS加鎖便可 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
寫鎖嘗試加鎖的過程就分析完了,其他的部分上文已經講過,這裏再也不贅述。
public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
讀鎖在加鎖開始就和其它鎖不一樣,調用的是acquireShared方法,意爲獲取共享鎖。
static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 右移16位獲得讀鎖狀態的值 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 爲何讀寫互斥?由於讀鎖一上來就判斷了是否有其它線程持有了寫鎖(當前線程持有寫鎖再獲取讀鎖是能夠的) if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); // 公平鎖判斷是否存在隊列,非公平鎖判斷第一個節點是否是EXCLUSIVE模式,是的話會返回true // 返回false則須要判斷讀鎖加鎖次數是否超過65535,沒有則使用CAS給讀鎖+1 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { // 第一個讀鎖線程就是當前線程 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 記錄讀鎖的重入 firstReaderHoldCount++; } else { // 獲取最後一次加讀鎖的重入次數記錄器HoldCounter HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) // 當前線程第一次重入須要初始化,以及當前線程和緩存的最後一次記錄器的線程id不一樣,須要從ThreadLocalHoldCounter拿到對應的記錄器 cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 緩存到ThreadLocal readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
這段代碼有點複雜,首先須要保證讀寫互斥,而後進行初次加鎖,若加鎖失敗就會調用fullTryAcquireShared方法進行兜底處理。在初次加鎖中與寫鎖不一樣的是,寫鎖的state能夠直接用來記錄寫鎖的重入次數,由於寫寫互斥,但讀鎖是共享的,state用來記錄讀鎖的加鎖次數了,重入次數該怎麼記錄呢?重入是指同一線程,那麼是否是可使用ThreadLocl來保存呢?沒錯,Doug Lea就是這麼處理的,新增了一個HoldCounter類,這個類只有線程id和重入次數兩個字段,當線程重入的時候就會初始化這個類並保存在ThreadLocalHoldCounter類中,這個類就是繼承ThreadLocl的,用來初始化HoldCounter對象並保存。
這裏還有個小細節,爲何要使用cachedHoldCounter緩存最後一次加讀鎖的HoldCounter?由於大部分狀況下,重入和釋放鎖的線程頗有可能就是最後一次加鎖的線程,因此這樣作可以提升加解鎖的效率,Doug Lea真是把性能優化到了極致。
上面只是初次加鎖,有可能會加鎖失敗,就會進入到fullTryAcquireShared方法:
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
這個方法中代碼和tryAcquireShared基本上一致,只是採用了自旋的方式,處理初次加鎖中的漏網之魚,讀者們可自行閱讀分析。
上面兩個方法若返回大於0則表示加鎖成功,小於0則會調用doAcquireShared方法,這個就和以前分析的acquireQueued差很少了:
private void doAcquireShared(int arg) { // 先添加一個SHARED類型的節點到隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { // 再次嘗試加讀鎖 int r = tryAcquireShared(arg); if (r >= 0) { // 設置head節點以及傳播喚醒後面的讀線程 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 只有前一個節點的waitStatus=-1時纔會park,=0或者-3(先不考慮-2和1的狀況)都會設置爲-1後再次自旋嘗試加鎖,若仍是加鎖失敗就會park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { // 設置頭節點 Node h = head; // Record old head for check below setHead(node); // propagate是tryAcquireShared的返回值,當前線程加鎖成功還要去喚醒後繼的共享節點 // (其他的判斷比較複雜,筆者也還未想明白,知道的讀者能夠指點一下) if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 判斷後繼節點是不是共享節點 if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared() { for (;;) { Node h = head; // 存在後繼節點 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 當前一個節點加鎖成功後天然須要將-1改回0,並喚醒後繼線程,同時自旋將0改成-2讓喚醒傳播下去 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } // 設置頭節點的waitStatus=-2,使得喚醒能夠傳播下去 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
這裏的邏輯也很是的繞,當多個線程同時調用addWaiter添加到隊列中後,而且假設這些節點的第一個節點的前一個節點就是head節點,那麼第一個節點就能加鎖成功(假設都是SHARED節點),其他的節點在第一個節點設置頭節點以前都會進入shouldParkAfterFailedAcquire方法,這時候waitStatus都等於0,因此繼續自旋不會park,若再次加鎖還失敗就會park(由於這時候waitStatus=-1),但都是讀線程的狀況下通常都不會出現,由於setHeadAndPropagate第一步就是修改head,因此其他SHARED節點最終都能加鎖成功並一直將喚醒傳播下去。
以上就是讀寫鎖加鎖過程,解鎖比較簡單,這裏就不詳細分析了。
讀寫鎖將state分爲了高二字節和低二字節,分別存儲讀鎖和寫鎖的狀態,實現更爲的複雜,在使用上還有幾點須要注意:
private static void rw() { r.lock(); try { log.info("獲取到讀鎖"); w.lock(); try { log.info("獲取到寫鎖"); } finally { w.unlock(); } } finally { r.unlock(); } }
多個線程訪問都能獲取到讀鎖,但讀寫互斥,彼此都要等待對方的讀鎖釋放才能獲取到寫鎖,這就形成了死鎖。
ReentrantReadWriteLock在某些場景下性能上不算高,所以Doug Lea在JDK1.8的時候又提供了一把高性能的讀寫鎖StampedLock,前者讀寫鎖都是悲觀鎖,然後者提供了新的模式——樂觀鎖,但它不是基於AQS實現的,本文不進行分析。
Lock接口中還有一個方法newCondition,這個方法就是建立一個條件隊列:
public Condition newCondition() { return sync.newCondition(); } final ConditionObject newCondition() { return new ConditionObject(); }
所謂條件隊列就是建立一個新的ConditionObject對象,這個對象的數據結構在開篇就看過了,包含首、尾兩個節點字段,每當調用Condition#await方法時就會在對應的Condition對象中排隊等待:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 加入條件隊列 Node node = addConditionWaiter(); // 由於Condition.await必須配合Lock.lock使用,因此await時就是將已得到鎖的線程所有釋放掉 int savedState = fullyRelease(node); int interruptMode = 0; // 判斷是在同步隊列仍是條件隊列,後者則直接park while (!isOnSyncQueue(node)) { LockSupport.park(this); // 獲取打斷處理方式(拋出異常或重設標記) if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 調用aqs的方法 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled // 清除掉已經進入同步隊列的節點 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter() { Node t = lastWaiter; // 清除狀態爲取消的節點 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 建立一個CONDITION狀態的節點並添加到隊列末尾 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
await方法實現比較簡單,大部分代碼都是上文分析過的,這裏再也不重複。接着來看signal方法:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 從條件隊列第一個節點開始喚醒 Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { // 修改waitStatus狀態,若是修改失敗,則說明該節點已經從條件隊列轉移到了同步隊列 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 上面修改爲功,則將該節點添加到同步隊列末尾,並返回以前的尾結點 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // unpark當前線程,結合await方法看 LockSupport.unpark(node.thread); return true; }
signal的邏輯也比較簡單,就是喚醒條件隊列中的第一個節點,主要是要結合await的代碼一塊兒理解。
上文分析的鎖都是用來實現併發安全控制的,而對於多線程協做JUC又基於AQS提供了CountDownLatch、CyclicBarrier、Semaphore等組件,下面一一分析。
CountDownLatch在建立的時候就須要指定一個計數:
CountDownLatch countDownLatch = new CountDownLatch(5);
而後在須要等待的地方調用countDownLatch.await()方法,而後在其它線程完成任務後調用countDownLatch.countDown()方法,每調用一次該計數就會減一,直到計數爲0時,await的地方就會自動喚醒,繼續後面的工做,因此CountDownLatch適用於一個線程等待多個線程的場景,那它是怎麼實現的呢?讀者們能夠結合上文本身先思考下。
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); }
與前面講的鎖同樣,也有一個內部類Sync繼承自AQS,而且在構造時就將傳入的計數設置到了state屬性,看到這裏不難猜到CountDownLatch的實現原理了。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
在await方法中使用的是可打斷的方式獲取的共享鎖,一樣除了tryAcquireShared方法,其他的都是複用的以前分析過的代碼,而tryAcquireShared就是判斷state是否等於0,不等於就阻塞。
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
而調用countDown就更簡單了,每次對state遞減,直到爲0時纔會調用doReleaseShared釋放阻塞的線程。
最後須要注意的是CountDownLatch的計數是不支持重置的,每次使用都要新建一個。
CyclicBarrier和CountDownLatch使用差很少,不過它只有await方法。CyclicBarrier在建立時一樣須要指定一個計數,當調用await的次數達到計數時,全部線程就會同時喚醒,至關於設置了一個「起跑線」,須要等全部運動員都到達這個「起跑線」後才能一塊兒開跑。另外它還支持重置計數,提供了reset方法。
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
CyclicBarrier提供了兩個構造方法,咱們能夠傳入一個Runnable類型的回調函數,當達到計數時,由最後一個調用await的線程觸發執行。
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); // 是否打斷,打斷會喚醒全部條件隊列中的線程 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 計數爲0時,喚醒條件隊列中的全部線程 int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { // 不帶超時時間直接進入條件隊列等待 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
這裏邏輯比較清晰,就是使用了ReentrantLock以及Condition來實現。在構造方法中咱們能夠看到保存了兩個變量count和parties,每次調用await都會對count變量遞減,count不爲0時都會進入到trip條件隊列中等待,不然就會經過signalAll方法喚醒全部的線程,並將parties從新賦值給count。
reset方法很簡單,這裏不詳細分析了。
Semaphore是信號的意思,或者說許可,能夠用來控制最大併發量。初始定義好有幾個信號,而後在須要獲取信號的地方調用acquire方法,執行完成後,須要調用release方法回收信號。
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
它也有兩個構造方法,能夠指定公平或是非公平,而permits就是state的值。
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 非公平方式 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 公平方式 protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
acquire方法和CountDownLatch是同樣的,只是tryAcquireShared區分了公平和非公平方式。獲取到信號至關於加共享鎖成功,不然則進入隊列阻塞等待;而release方法和讀鎖解鎖方式也是同樣的,只是每次release都會將state+1。
本文詳細分析了AQS的核心原理、鎖的實現以及經常使用的相關組件,掌握其原理能讓咱們準確的使用JUC下面的鎖以及線程協做組件。另外AQS代碼設計是很是精良的,有很是多的細節,精簡的代碼中把全部的狀況都考慮到了,細細體味對咱們自身編碼能力也會有很大的提升。 文章錯誤和不清楚的地方歡迎批評指出,另外超時相關的API本文都未涉及到,讀者可自行分析。