AQS(AbstractQueuedSynchronizer)抽象隊列同步器,屬於多線程編程的基本工具;JDK對其定義得很詳細,並提供了多種經常使用的工具類(重入鎖,讀寫鎖,信號量,CyclicBarrier,CountDownLatch),在閱讀源碼的時候,我是從具體工具類往上讀的,這樣會比較便於理解AQS的設計。node
下面,我將從五種經常使用類去分析源碼,進而學習AQS。編程
論文地址安全
咱們要閱讀的重入鎖,它首先遵循Lock的規範,而且實現了序列化接口;而Lock的規範,必然定義瞭如何鎖的,如何解鎖的,而且規定了newCondition這個方法。而重入鎖中,真正使用AQS的是他裏面內涵的一個實現類Sync,它繼承自AQS,並具備AQS的全部規範。多線程
這個內涵的Sync,在重入鎖中實現了兩種類型的隊列,一個是公平隊列,另外一個是非公平隊列,這取決於你構造重入鎖的時候傳入的是哪個,默認是非公平鎖;less
咱們在進入lock這個方法的時候,看到它真正調用的是acquire方法,而acquire方法,是AQS的一個標準定義;咱們先進入公平鎖的閱讀;工具
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
咱們往上找,就找到了AQS裏的acquire方法;這個方法寫的很是乾淨,首先申請一個arg數量的權限,若是申請不成功,則進入等待隊列;這個tryAcquire方法,是在子類實現的;這裏插入一下,AQS裏存在一個state字段,它表示可一個許可,而重入鎖中它初始化爲0;而後咱們找到公平鎖的實現方法。oop
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
在重入鎖中,咱們要能獲取鎖,實際上是state是否等於0;性能
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
這個方法裏面有幾種狀況會返回false,(h != t)表示等待隊列非空,爲空就會返回false;另外一種狀況就是,隊列非空,當前隊列不等於後繼結點的隊列,會返回true;由於是公平隊列,你要申請權限必然是沒人排隊,即便有人排隊,也得是你最前面才能申請;ok,下一個條件就是CAS這個state值,成功就將獨佔狀態設爲當前線程;這個else if,就是重入鎖重入的關鍵了,若是當前線程和獨佔線程是一個,那就將權限再加acquires,固然這個state會超過上限並拋出overflow相似的異常。學習
若是申請不成功,固然要排隊了,排隊都是雙端隊列的CRUD;ui
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
CAS初始化head,並頭尾指向一個地方;而後注意到外面是否是有一個for,又是自旋CAS的操做;在第二次循環的時候,會將node,整在後面。
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
這個方法,就是獲得的剛纔入隊的Node,而且又來一個自旋;而後看本身是否是頭結點,若是不是,則進入等待隊列,使用LockSupport來使當期線程休眠;這樣就構成了申請鎖並排隊的過程;
接着咱們去看unlock方法,它指向的是AQS的release接口,與acquire相反,它是將state作減法;
而這個方法,只有是獨佔線程調用才能夠,由於全部lock的非獨佔線程,全都會被park;
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
因此在這裏並不須要CAS,它的安全性在於不是獨佔線程就會拋出非法異常;若是它釋放成功,就會喚醒後繼結點;後繼結點是head的後一個,這時候後繼結點被阻塞在lock代碼行能夠往下走了。這樣就造成了一個線程同步的重入鎖(公平)。咱們能夠看到AQS的設計很精湛,不少方法,都是重寫定製的,它值作了一些規範的定義。
抽象隊列同步器,它是基於一個隊列作線程排隊的設計,那麼這個隊列的基本元素Node,咱們看一下。
定義了一系列的狀態,攜帶每一次申請的線程thread,等等,很是直觀,註釋裏還給你畫出來了。
看完公平鎖,咱們瞭解到它每次申請都要日後排隊,可想而知費公平鎖,就是不排隊?仍是要排隊的。
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
咱們垂手可得找到上面的方法,它走的流程多了一個,就是若是許但是能夠申請的,則不須要排隊,直接申請,若是申請失敗,則走入公平鎖的流程。這就是說全部來申請鎖的線程,都有一次競爭的機會,若是沒有競爭上,仍是排隊。而release接口並未區別實現,因此每次unpark線程,仍是按照隊列順序。
重入鎖就這麼簡單的讀完了。接着,公平鎖和非公平鎖的性能區別在哪呢?在於重複的park和unpark線程,對於非公平鎖,線程被park的概率會小一點,由於它不是必然排隊;而公平鎖必然是排隊的,它們的排隊機制是同樣的,而非公平鎖park線程的概率更小,則性能優於公平鎖。
看個圖,就不須要讀了。一把讀鎖,一把寫鎖,有興趣能夠本身研讀。
信號量是與重入鎖徹底不一樣類型的鎖,由於他是共享的(搞這麼複雜,不就是state初始化大於0)。它的做用用過的都有印象,就是多個線程能夠共享這一把鎖,而線程大於初始化憑證以後,就會被阻塞。
/** * Fair version */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
我相信能夠很簡單地找到這個FairSync,在semaphore中。它先判斷是否只有你來申請,若是不是就回去操做state,若是申請小於0,直接返回,排隊。
/** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
與重入鎖不一樣的是,入隊的時候Node指定爲SHARE模式,而且再次嘗試獲取鎖,若是獲取的鎖是大於等於0的,將會調用setHeadAndPropagate方法傳播釋放。若是是大於0的,則會調用release接口,下一個喚醒的線程又會重複上述過程,一直喚醒到==0。和重複鎖不同的是,它具備傳染性。
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
只要調用了這個方法,就會進行上述所說的傳染;正經的,這個方法就是激活線程並設置PROPAGATE,表示一直日後傳播激活。
共享鎖和獨佔鎖的區別在於,解鎖線程會不會傳染...
這樣,咱們已經讀了aqs裏面的兩種鎖了。
在看完上面的基本元素以後,搞一個倒計時器是什麼鬼,不就是搞個變量而後一直減,而後等於0的時候一古腦兒釋放???
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
很容易找到上述代碼,咱們調用倒計時器時候,就會調用countDown方法,每次都會給初始化減一。這時候主線程或者等待線程,調用await在等待。直到它減到0,就會作doReleaseShared,這個時候等待隊列只有一個,就是父級線程,它就能夠往下走了。由於這貨減到0以後不會reset,因此不能複用。。。。
我曹?這貨可貴一筆,停更。