要看CountDownLatch源碼,你會發現其中的核心是由一個繼承了AbstractQueuedSynchronizer類的靜態內部類Sync。 實際上ReentrantLock,Semaphore等線程控制類的內部都是基於AbstractQueuedSynchronizer實現的。 先來了解一下AbstractQueuedSynchronizer。node
隊列同步器:用來構建鎖或者其它同步組件的基礎框架。它內部使用一個int型成員變量state表示同步狀態,經過一個內置的FIFO隊列來完成資源獲取線程的排隊工做。 AQS就是提供了獲取鎖,FIFO排隊,釋放鎖這些邏輯的基礎框架。ReentrantLock,Semaphore,CountDownLatch等併發類都是基於AQS實現的。他們的內部都有一個靜態內部類繼承了AQS。 至關於就是說,ReentrantLock,Semaphore,CountDownLatch等併發類只是提供給使用者的門面,內部核心邏輯是由AQS實現的。
AQS中的一些核心方法:併發
# 修改線程同步狀態的方法: getState(): 獲取當前同步狀態 setState(int newState): 設置當前同步裝 compareAndSetState(int expect,int update):使用CAS樂觀鎖設置當前同步狀態,保證原子性 # 獨佔式/共享式獲取/釋放同步狀態: tryAcquire(int arg) 獨佔式獲取同步狀態 tryRelease(int arg) 獨佔式釋放同步狀態 tryAcquireShared(int arg) 共享式獲取同步狀態 tryReleaseShared(int arg) 共享式釋放同步狀態 isHeldExclusively() 當前同步器釋放在獨佔模式下被線程佔用。返回true表示被當前線程獨佔 # 獲取同步隊列的相關信息 getQueuedThreads() 獲取等待在同步隊列上的線程集合
同步器依賴於內部的FIFO隊列來進行同步狀態管理。當線程獲取同步狀態失敗時,同步器會將當前線程構形成一個節點Node並加入同步隊列,而且會阻塞當前線程。當同步狀態釋放時,會把隊列中的第一個線程喚醒,使其獲取同步狀態。框架
那麼首先來看看這個同步隊列是什麼樣子的吧。 查看Node節點類:源碼分析
static final class Node { /**標記這個節點是處於共享模式*/ static final Node SHARED = new Node(); /**標記這個節點是處於獨佔模式*/ static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /**表示線程的等待狀態。 1表示cancelled, 表示這個等待的線程等待超時或者被中斷,須要從同步隊列中取消等待; -1表示signal,後繼節點處於等待狀態,而當前節點若是釋放了同步狀態或者被取消,將會喚醒後繼節點,使其繼續運行 -2表示condition,當前線程等待在condition上,當其餘線程對condition調用了signal方法後,改節點就會移出等待隊列,去獲取同步狀態; -3表示propagate,表示下一次共享式同步狀態將會無條件傳播下去; 0表示initial,初始狀態*/ volatile int waitStatus; /**前一個節點*/ volatile Node prev; /**後一個節點*/ volatile Node next; /**也是用來表示下一個節點。若是當前節點是共享的,那麼這個字段就是SHARED常量*/ Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } /**返回上一個節點*/ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
能夠看出,Node節點中保存了線程引用,等待狀態,是共享仍是獨佔,先後節點引用等信息。ui
再看AQS的三個屬性:this
/**等待隊列的頭節點*/ private transient volatile Node head; /**等待隊列的尾節點*/ private transient volatile Node tail; /**這就是表示同步狀態的屬性*/ private volatile int state;
在AQS中維護了同步隊列的頭節點和尾節點,以及同步狀態。線程
因此同步隊列的結構就很清楚了: code
那麼線程是怎麼被放入隊列的呢?從acquire方法開始跟蹤:blog
//這個方法就是嘗試獲取獨佔鎖 public final void acquire(int arg) { //若是獲取不到鎖,就把當前線程加入到等待隊列進行等待 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 打斷當前線程,其實就是本身打斷本身 selfInterrupt(); }
先看一下addWaiter方法,這個方法就是給當前線程構造一個Node節點。繼承
private Node addWaiter(Node mode) { //mode表示當前線程是獨佔式仍是共享式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; //將當前節點加入到等待隊列的尾部 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //沒有尾節點的話就進行初始化 enq(node); return node; }
下面繼續跟蹤acquireQueued方法:
//這個方法就是在死循環等待鎖 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 獲取前一個節點 final Node p = node.predecessor(); //若是前一個節點是頭節點,而且獲取到了鎖,那麼當前節點這隻爲頭節點,並返回。 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
也就是說,等待隊列的頭節點實際上就是當前得到鎖的節點,後面的節點死循環一直等待。 可是隻有頭節點的後一個節點纔會不斷嘗試獲取鎖,由於要保證FIFO順序。 新進來的線程是放在隊列尾部進行死循環等待的。
那麼實際上等待隊列是這樣的流程:
下面再跟蹤釋放鎖的過程:
//釋放獨佔鎖 public final boolean release(int arg) { //判斷是否能夠釋放鎖 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
這裏當waitStatus不爲0的時候,纔會調用unparkSuccessor方法去喚醒後面一個方法。
什麼場景下head節點的waitStatus不爲0呢??
進入unparkSuccessor方法:
TODO 先略了,後面在補充。。。。。
瞭解的AQS的原理,那麼看 CountDownLatch的源碼就so easy了。由於CountDownLatch只是一個門面,核心邏輯就是AQS。