最近在學習併發編程原理,因此準備整理一下本身學到的知識,先寫一篇CountDownLatch的源碼分析,以後但願能夠慢慢寫完整個併發編程。java
CountDownLatch是java的JUC併發包裏的一個工具類,能夠理解爲一個倒計時器,主要是用來控制多個線程之間的通訊。
好比有一個主線程A,它要等待其餘4個子線程執行完畢以後才能執行,此時就能夠利用CountDownLatch來實現這種功能了。node
public static void main(String[] args){ System.out.println("主線程和他的兩個小兄弟約好去吃火鍋"); System.out.println("主線程進入了飯店"); System.out.println("主線程想要開始動筷子吃飯"); //new一個計數器,初始值爲2,當計數器爲0時,主線程開始執行 CountDownLatch latch = new CountDownLatch(2); new Thread(){ public void run() { try { System.out.println("子線程1——小兄弟A 正在到飯店的路上"); Thread.sleep(3000); System.out.println("子線程1——小兄弟A 到飯店了"); //一個小兄弟到了,計數器-1 latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start(); new Thread(){ public void run() { try { System.out.println("子線程2——小兄弟B 正在到飯店的路上"); Thread.sleep(3000); System.out.println("子線程2——小兄弟B 到飯店了"); //另外一個小兄弟到了,計數器-1 latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start(); //主線程等待,直到其餘兩個小兄弟也進入飯店(計數器==0),主線程才能吃飯 latch.await(); System.out.println("主線程終於能夠開始吃飯了~"); }
核心代碼:編程
CountDownLatch latch = new CountDownLatch(1); latch.await(); latch.countDown();
其中構造函數的參數是計數器的值;
await()方法是用來阻塞線程,直到計數器的值爲0
countDown()方法是執行計數器-1操做併發
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
這段代碼很簡單,首先if判斷傳入的count是否<0,若是小於0直接拋異常。
而後new一個類Sync,這個Sync是什麼呢?咱們一塊兒來看下函數
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } //嘗試獲取共享鎖 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //嘗試釋放共享鎖 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
能夠看到Sync是一個內部類,繼承了AQS,AQS是一個同步器,以後咱們會詳細講。
其中有幾個核心點:工具
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
能夠看到,實際上是經過內部類Sync調用了父類AQS的acquireSharedInterruptibly()方法。oop
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //判斷線程是不是中斷狀態 if (Thread.interrupted()) throw new InterruptedException(); //嘗試獲取state的值 if (tryAcquireShared(arg) < 0)//step1 doAcquireSharedInterruptibly(arg);//step2 }
tryAcquireShared(arg)這個方法就是咱們剛纔在Sync內看到的重寫父類AQS的方法,意思就是判斷是否getState() == 0,若是state爲0,返回1,則step1處不進入if體內acquireSharedInterruptibly(int arg)方法執行完畢。若state!=0,則返回-1,進入if體內step2處。 源碼分析
下面咱們來看acquireSharedInterruptibly(int arg)方法:學習
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //step一、把當前線程封裝爲共享類型的Node,加入隊列尾部 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //step二、獲取當前node的前一個元素 final Node p = node.predecessor(); //step三、若是前一個元素是隊首 if (p == head) { //step四、再次調用tryAcquireShared()方法,判斷state的值是否爲0 int r = tryAcquireShared(arg); //step五、若是state的值==0 if (r >= 0) { //step六、設置當前node爲隊首,並嘗試釋放共享鎖 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //step七、是否能夠安心掛起當前線程,是就掛起;而且判斷當前線程是否中斷 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { //step八、若是出現異常,failed沒有更新爲false,則把當前node從隊列中取消 if (failed) cancelAcquire(node); } }
按照代碼中的註釋,咱們能夠大概瞭解該方法的內容,下面咱們來仔細看下其中調用的一些方法是幹什麼的。
一、首先看addWaiter()ui
//step1 private Node addWaiter(Node mode) { //把當前線程封裝爲node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //獲取當前隊列的隊尾tail,並賦值給pred Node pred = tail; //若是pred!=null,即當前隊尾不爲null if (pred != null) { //把當前隊尾tail,變成當前node的前繼節點 node.prev = pred; //cas更新當前node爲新的隊尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //若是隊尾爲空,走enq方法 enq(node);//step1.1 return node; } ----------------------------------------------------------------- //step1.1 private Node enq(final Node node) { for (;;) { Node t = tail; //若是隊尾tail爲null,初始化隊列 if (t == null) { // Must initialize //cas設置一個新的空node爲隊首 if (compareAndSetHead(new Node())) tail = head; } else { //cas把當前node設置爲新隊尾,把前隊尾設置成當前node的前繼節點 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
二、接下來咱們在來看setHeadAndPropagate()方法,看其內部實現
//step6 private void setHeadAndPropagate(Node node, int propagate) { //獲取隊首head Node h = head; // Record old head for check below //設置當前node爲隊首,並取消node所關聯的線程 setHead(node); // if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //若是當前node的後繼節點爲null或者是shared類型的 if (s == null || s.isShared()) //釋放鎖,喚醒下一個線程 doReleaseShared();//step6.1 } } -------------------------------------------------------------------- //step6.1 private void doReleaseShared() { for (;;) { //找到頭節點 Node h = head; if (h != null && h != tail) { //獲取頭節點狀態 int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //喚醒head節點的next節點 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
public void countDown() { sync.releaseShared(1); }
能夠看到調用的是父類AQS的releaseShared 方法
public final boolean releaseShared(int arg) { //state-1 if (tryReleaseShared(arg)) {//step1 //喚醒等待線程,內部調用的是LockSupport.unpark方法 doReleaseShared();//step2 return true; } return false; } ------------------------------------------------------------------ //step1 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { //獲取當前state的值 int c = getState(); if (c == 0) return false; int nextc = c-1; //cas操做來進行原子減1 if (compareAndSetState(c, nextc)) return nextc == 0; } }
CountDownLatch主要是經過計數器state來控制是否能夠執行其餘操做,若是不能就經過LockSupport.park()方法掛起線程,直到其餘線程執行完畢後喚醒它。
下面咱們經過一個簡單的圖來幫助咱們理解一下:
PS:本人也是還在學習的路上,理解的也不是特別透徹,若有錯誤,願傾聽教誨。^_^