上期回顧:html
上次博客咱們主要說了咱們juc併發包下面的ReetrantLock的一些簡單使用和底層的原理,是如何實現公平鎖、非公平鎖的。內部的雙向鏈表究竟是什麼意思,prev和next究竟是什麼,爲何要引入heap和tail來值向null的Node節點。高併發時候是如何保證state來記錄重入鎖的,在咱們的上次博客都作了詳細的說明。此次咱們來聊一些簡單易懂且實用的AQS中的工具類。node
Semaphore信號量:多線程
這個東西很簡單,別看字面意思,什麼信號量,我也不懂得那個術語什麼意思,Semaphore你能夠這樣來理解,咱們要去看電影,並且是3D電影(必須戴3D眼鏡才能夠進入),可是比較不巧的是咱們電影院只有兩個3D眼鏡了,也就是說,咱們每次只能進去兩我的看電影,而後等待這兩我的看完電影之後把眼鏡還回來,後面的兩我的才能繼續觀看,就是說每次只容許最多進去兩我的,每次進入到線程獲取鎖,須要你獲得前置的票據,才能夠進行後續的流程。能夠理解爲一個簡單的限流吧。咱們來一下代碼示例。併發
public class Test { public static void main(String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(2); for (int i = 0; i < 5; i++) { new Thread(new Task(semaphore,"xiaocaijishu"+i)).start(); } } static class Task extends Thread{ Semaphore semaphore; public Task(Semaphore semaphore,String tname){ this.semaphore = semaphore; this.setName(tname); } public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"拿着3D眼鏡進去了,時間是"+System.currentTimeMillis()); Thread.sleep(1000); semaphore.release(); System.out.println(Thread.currentThread().getName()+"出來了,將3D眼鏡還給了服務人員,時間是"+System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
運行結果就是這樣的高併發
咱們來解釋一下運行結果,線程1和線程3同一時間去看電影了,而後1出來了,這時線程9立刻拿着咱們的3D眼鏡進去了,過了一會線程3也看完電影了,出來了還了3D眼鏡,線程7又在同一時間拿着3D眼鏡進去看電影了,後續線程都是如此執行的,每次只是進入兩個線程。工具
簡單的使用看到了,咱們來看看底層的源碼設計吧。開始的時候咱們是建立一個Semaphore內部票據數目給予的是2。ui
//1.建立初始票據是2的Semaphore Semaphore semaphore = new Semaphore(2); //2.進入Semaphore,查看數據2是如何存儲的. public Semaphore(int permits) { sync = new NonfairSync(permits); } //3.底層仍是基於sync 建立了一個對象,但不一樣於過去ReetrantLock的是,此次是一個非公平的鎖對象,咱們再次進入NonfairSync看看那個數字2到底放在哪裏了. Sync(int permits) { setState(permits); } //4.咱們能夠看到底層仍是用State來存儲的.
此次沒有把全部代碼所有粘出來,感受那樣像是湊篇幅同樣。this
經過上述代碼,咱們能夠看到,咱們的初始票據數,是上一次那個state來存的。spa
後續咱們調用了acquire方法來嘗試獲取票據,acquire方法也能夠傳入獲取票據數目的好比semaphore.acquire(2);也是能夠的。咱們進入acquire方法來看看究竟是如何獲取的。線程
//從new Semaphore(2);點擊進入後續方法 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //咱們能夠看到,當咱們沒有傳須要獲取多少票據的時候,會默認給予1這個參數,咱們來繼續看後續流程 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //Thread.interrupted()判斷當前線程是否已經中斷,若是中斷我直接拋出異常,電影都演完了,我拿3D眼鏡還有毛線用. //tryAcquireShared(arg)嘗試獲取票據,arg是1,剛纔給予的默認1 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //內部有實現關係,因此調用的是Semaphore類nonfairTryAcquireShared方法,咱們來解讀一下 //直接就是一個死循環, int available = getState();獲取一下當前還有多少票據 // int remaining = available - acquires;計算出當前票據減去所需票據的一個剩餘值 //if (remaining < 0 || compareAndSetState(available, remaining))咱們現有2個票據,拿走1個,剩餘1個,因此remaining < 0 必定是false的 //再來看另外一半compareAndSetState,用原子計算(上次博客說過爲何要原子計算)方式來修改剩餘票據,這個是能夠修改爲功的.因此知足條件能夠返回一個2-1 也就是返回一個正數1
是否是有點看懵圈了,不少小夥伴感受if (remaining < 0 ||compareAndSetState(available, remaining))前面的remaining<0,這個或判斷貌似沒用啊,來張圖解釋一下。
有沒有感受好點了,本身能夠跟着源代碼走一走,獲取的過程就差一個doAcquireSharedInterruptibly尚未看了,若是獲取超過了票據數,也就是不該該讓返回負數時運行doAcquireSharedInterruptibly方法,咱們來看一下。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//以共享方式添加節點 boolean failed = true; try { for (;;) { final Node p = node.predecessor();//判斷前驅節點是否爲空 if (p == head) { int r = tryAcquireShared(arg);//再次嘗試獲取票據 if (r >= 0) {//>= 0表示獲取票據成功 setHeadAndPropagate(node, r);//更改頭節點 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && //剔除不可用的Node節點 parkAndCheckInterrupt()) //阻塞當前線程 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
通過兩次以上的嘗試,咱們將該線程阻塞了,不至於一直for循環在運行,也就這樣,票據發放完畢了。
過程差很少就是這樣的,咱們能夠再仔細看一下是如何添加節點的,上次ReetrantLock說了一些,咱們此次再來看一下。咱們現已第一次塞節點爲例,
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) {//第一次必定是空的,咱們如今已初始塞節點爲例。 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node);//爲空直接進入這個邏輯 return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //1.第一次必定是空 //二次循環不爲空 進入else if (compareAndSetHead(new Node()))//2.建立一個空節點,而且做爲head節點. tail = head;//3.tail指向那個head節點 } else { node.prev = t;//4. 將node節點的前驅指針指向 if (compareAndSetTail(t, node)) {//5.原子計算方式將node節點後驅節點指向tail t.next = node;//6.將t節點(空節點)的後驅指針指向node節點 return t; } } } }
第一次循環只是一個內部的初始空節點,第二次循環纔是移動指針塞入的過程。
節點喚醒是在釋放票據時被喚醒的,代碼超級簡單,能夠本身當作一份做業,本身去看一遍代碼吧~!提示流程就是先還票據,而後喚醒。Semaphore差很少就這些知識點,我也帶着你們簡單的看了一遍源碼。咱們再來繼續看一下後面AQS的一些工具類。
CountDownLaunch的基本使用
CountDownLaunch很好理解,也是比較實用的,咱們幹王者農藥的時候就是一個很好的栗子,遊戲選完人物你們一塊兒加載地圖等遊戲資料,有的人慢,有的人快,這時就印出來了CountDownLaunch,至關於咱們5個玩家同時開啓5個線程,而後一塊兒執行,執行完畢先等着,直到5個玩家所有執行完成時,才能夠運行後續操做。咱們來看一下代碼。
public class CountDownLaunchSample { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new playerOne(countDownLatch)).start(); new Thread(new playerTwo(countDownLatch)).start(); countDownLatch.await(); System.out.println("所有加載完成"); } static class playerOne implements Runnable { CountDownLatch countDownLatch; public playerOne(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public void run() { try { System.out.println("玩家1開始加載..."); Thread.sleep(2000); System.out.println("玩家1加載完成"); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (countDownLatch != null) countDownLatch.countDown(); } } } static class playerTwo implements Runnable { CountDownLatch countDownLatch; public playerTwo(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public void run() { try { System.out.println("玩家2開始加載..."); Thread.sleep(10000); System.out.println("玩家2加載完成"); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (countDownLatch != null) countDownLatch.countDown(); } } } }
實際項目中若是遇到讀取excel多個sheet頁籤而後彙總數據的狀況也能夠採用CountDownLanch。注意最後final的countDownLatch.countDown()方法,也是一個相似上面票據增減的方法。
CyclicBarrier柵欄的簡單使用:
CyclicBarrier和咱們上面的CountDownLanch差很少,都是開啓多個任務一塊兒去執行,不一樣的是CountDownLanch須要支線任務執行完成而後CountDownLanch作一個彙總,而後繼續運行後續程序。CyclicBarrier不須要作彙總。再就是CyclicBarrier是能夠重複的。
public class CyclicBarrierTest implements Runnable { private CyclicBarrier cyclicBarrier; private int index ; public CyclicBarrierTest(CyclicBarrier cyclicBarrier, int index) { this.cyclicBarrier = cyclicBarrier; this.index = index; } public void run() { try { System.out.println("index: " + index); index--; cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() { public void run() { System.out.println("全部特工到達屏障,準備開始執行祕密任務"); } }); for (int i = 0; i < 10; i++) { new Thread(new CyclicBarrierTest(cyclicBarrier, i)).start(); } cyclicBarrier.await(); System.out.println("所有到達屏障...."); } }
這個須要注意的是CyclicBarrier cyclicBarrier = new CyclicBarrier(11, 這個11,就是說必定有11個線程執行完畢,我才能夠執行後面的操做,咱們下面for循環是10,而咱們那裏寫的是11啊,別忘記還有一個主線程呢,因此說每次計算必定加一個主線程啊。
Exchanger的簡單使用
最後就是咱們Exchanger,平時使用的很少,咱們瞭解一下就能夠了,摟一眼代碼,就是線程之間的變量交換。
public static void main(String []args) { final Exchanger<Integer> exchanger = new Exchanger<Integer>(); for(int i = 0 ; i < 4 ; i++) { final Integer num = i; new Thread() { public void run() { System.out.println("我是線程:Thread_" + this.getName() + "個人數據是:" + num); try { Integer exchangeNum = exchanger.exchange(num); Thread.sleep(1000); System.out.println("我是線程:Thread_" + this.getName() + "我原先的數據爲:" + num + " , 交換後的數據爲:" + exchangeNum); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); } }
總結:
此次咱們核心梳理了咱們的Semaphore的執行流程,內部是如何來實現咱們的票據計數,獲取,歸還等操做的,再就是咱們for無限循環會在兩次之後自動阻塞的設計思想,還有咱們的CountDownLanch、CyclicBarrier、Executors的基本使用,並賦予你們簡單的代碼流程,今天就說到這,明天咱們繼續來講咱們的多線程。