| 好看請贊,養成習慣html
- 你有一個思想,我有一個思想,咱們交換後,一我的就有兩個思想
- If you can NOT explain it simply, you do NOT understand it well enough
看到本期內容這麼少,是否是心動了呢?java
上一篇萬字長文 Java AQS隊列同步器以及ReentrantLock的應用 爲咱們讀 JUC 源碼以及其設計思想作了足夠多的鋪墊,接下來的內容我將重點說明差別化,若是有些童鞋不是能很好的理解文中的一些內容,強烈建議回看上一篇文章,搞懂基礎內容,接下來的閱讀真會輕鬆加愉快node
AQS 中咱們介紹了獨佔式獲取同步狀態的多種情形:面試
AQS 提供的模版方法裏面還差共享式獲取同步狀態沒有介紹,因此咱們今天來揭開這個看似神祕的面紗編程
獨佔式是你中沒我,我中沒你的的一種互斥形式,共享式顯然就不是這樣了,因此他們的惟一區別就是:併發
同一時刻可否有多個線程同時獲取到同步狀態
簡單來講,就是這樣滴:oop
咱們知道同步狀態 state 是維護在 AQS 中的,拋開可重入鎖的概念,我在上篇文章中也提到了,獨佔式和共享式控制同步狀態 state 的區別僅僅是這樣:源碼分析
因此說想了解 AQS 的 xxxShared 的模版方法,只須要知道它是怎麼控制 state 的就行了性能
爲了幫助你們更好的回憶內容,我將上一篇文章的兩個關鍵內容粘貼在此處,幫助你們快速回憶,關於共享式,你們只須要關注【騷紫色】就能夠了ui
故事就從這裏提及吧 (你會發現和獨佔式驚人的類似),關鍵代碼都加了註釋
public final void acquireShared(int arg) { // 一樣調用自定義同步器須要重寫的方法,非阻塞式的嘗試獲取同步狀態,若是結果小於零,則獲取同步狀態失敗 if (tryAcquireShared(arg) < 0) // 調用 AQS 提供的模版方法,進入等待隊列 doAcquireShared(arg); }
進入 doAcquireShared
方法:
private void doAcquireShared(int arg) { // 建立共享節點「SHARED」,加到等待隊列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 進入「自旋」,這裏並非純粹意義上的死循環,在獨佔式已經說明過 for (;;) { // 一樣嘗試獲取當前節點的前驅節點 final Node p = node.predecessor(); // 若是前驅節點爲頭節點,嘗試再次獲取同步狀態 if (p == head) { // 在此以非阻塞式獲取同步狀態 int r = tryAcquireShared(arg); // 若是返回結果大於等於零,才能跳出外層循環返回 if (r >= 0) { // 這裏是和獨佔式的區別 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
上面代碼第 18 行咱們提到和獨佔式獲取同步狀態的區別,貼心的給你們一個更直觀的對比:
差異只在這裏,因此咱們就來看看 setHeadAndPropagate(node, r)
到底幹了什麼,我以前說過 JDK 源碼中的方法命名絕大多數仍是很是直觀的,該方法直譯過來就是 【設置頭而且傳播/繁衍】。獨佔式只是設置了頭,共享式除了設置頭還多了一個傳播,你的疑問應該已經來了:
啥是傳播,爲何會有傳播這個設置呢?
想了解這個問題,你須要先知道非阻塞共享式獲取同步狀態返回值的含義:
這裏說的傳播其實說的是 propagate > 0
的狀況,道理也很簡單,當前線程獲取同步狀態成功了,還有剩餘的同步狀態可用於其餘線程獲取,那就要通知在等待隊列的線程,讓他們嘗試獲取剩餘的同步狀態
若是要讓等待隊列中的線程獲取到通知,須要線程調用 release 方法實現的。接下來,咱們走近 setHeadAndPropagate
一探究竟,驗證一下
// 入參,node: 當前節點 // 入參,propagate:獲取同步狀態的結果值,即上面方法中的變量 r private void setHeadAndPropagate(Node node, int propagate) { // 記錄舊的頭部節點,用於下面的check Node h = head; // 將當前節點設置爲頭節點 setHead(node); // 經過 propagate 的值和 waitStatus 的值來判斷是否能夠調用 doReleaseShared 方法 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 若是後繼節點爲空或者後繼節點爲共享類型,則進行喚醒後繼節點 // 這裏後繼節點爲空意思是隻剩下當前頭節點了,另外這裏的 s == null 也是判斷空指針的標準寫法 if (s == null || s.isShared()) doReleaseShared(); } }
上面方法的大方向做用咱們瞭解了,可是代碼中什麼時候調用 doReleaseShared
的判斷邏輯仍是挺讓人費解的,爲何會有這麼一大堆的判斷,咱們來逐個分析一下:
這裏的空判斷有點讓人頭大,咱們先挑出來講明一下:
排除了其餘判斷條件的干擾,接下來咱們就專一分析 propagate 和 waitStatus 兩個判斷條件就能夠了,這裏再將 waitStatus 的幾種狀態展現在這裏,幫助你們理解,【騷粉色】是咱們一會要用到的:
propagate > 0
上面已經說過了,若是成立,直接短路後續判斷,而後根據 doReleaseShared 的判斷條件進行釋放
propagate > 0 不成立, h.waitStatus < 0 成立 (注意這裏的h是舊的頭節點)
何時 h.waitStatus < 0 呢?拋開 CONDITION 的使用,只剩下 SIGNAL 和 PROPAGATE,想知道這個答案,須要提早看一下 doReleaseShared()
方法了:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // CAS 將頭節點的狀態設置爲0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 設置成功後才能跳出循環喚醒頭節點的下一個節點 unparkSuccessor(h); } else if (ws == 0 && // 將頭節點狀態CAS設置成 PROPAGATE 狀態 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
從 doReleaseShared()
方法中能夠看出:
因此猜想,當前線程執行到 h.waitStatus < 0 的判斷前,有另一個線程恰好執行了 doReleaseShared()
方法,將 waitStatus 又設置成PROPAGATE = -3
這個理解有點繞,咱們仍是來畫個圖理解一下吧:
可能有同窗仍是不太能理解這麼寫的道理,咱們一直說 propagate <> = 0 的狀況,propagate = 0 表明的是當時/當時/當時 嘗試獲取同步狀態沒成功,可是以後可能又有共享狀態被釋放了,因此上面的邏輯是以防這種萬一,你懂的,嚴謹的併發就是要防止一切萬一,如今結合這個情景再來理解上面的判斷你是否豁然開朗了呢?
繼續向下看,
前序條件不成立,(h = head) == null || h.waitStatus < 0 注意這裏的h是新的頭節點)
有了上面鋪墊,這個就直接畫個圖就更好理解啦,其實就是沒有那麼巧有另一個線程摻合了
相信到這裏你應該理解共享式獲取同步狀態的所有過程了吧,至於非阻塞共享式獲取同步狀態和帶有超時時間獲取同步狀態,結合本文講的 setHeadAndPropagate 邏輯和獨佔式獲取同步狀態的實現過程過程來看,真是一毛同樣,這裏就再也不累述了,趕忙打開你的 IDE 去驗證一下吧
咱們分析了AQS 的模版方法,還一直沒說 tryAcquireShared(arg)
這個方法是如何被重寫的,想要了解這個,咱們就來看一看共享式獲取同步狀態的經典應用 Semaphore
Semaphore 中文多翻譯爲 【信號量】,我還特地查了一下劍橋辭典的英文解釋:
其實就是信號標誌(two flags),好比紅綠燈,每一個交通燈產生兩種不一樣行爲
在 Semaphore 裏面,何時是紅燈,何時是綠燈,其實就是靠 tryAcquireShared(arg)
的結果來表示的
因此咱們走近 Semaphore ,來看看它究竟是怎麼應用 AQS 的,又是怎樣重寫 tryAcquireShared(arg)
方法的
先看一下類結構
看到這裏你是否有點跌眼鏡,和 ReentrantLock 類似的可怕吧,若是你有些陌生,再次強烈建議你回看上一篇文章 Java AQS隊列同步器以及ReentrantLock的應用 ,這裏直接提速對比看公平和非公平兩種重寫的 tryAcquireShared(arg)
方法,沒有意外,公平與否,就是判斷是否有前驅節點
方法內部只是計算 state 的剩餘值,那 state 的初始值是多少怎麼設置呢?固然也就是構造方法了:
public Semaphore(int permits) { // 默認還是非公平的同步器,至於爲何默認是非公平的,在上一篇文章中也特地說明過 sync = new NonfairSync(permits); } NonfairSync(int permits) { super(permits); }
super 方法,就會將初始值給到 AQS 中的 state
也許你發現了,當咱們把 permits 設置爲1 的時候,不就是 ReentrantLock 的互斥鎖了嘛,說的一點也沒錯,咱們用 Semaphore 也能實現基本互斥鎖的效果
static int count; //初始化信號量 static final Semaphore s = new Semaphore(1); //用信號量保證互斥 static void addOne() { s.acquire(); try { count+=1; } finally { s.release(); } }
But(英文聽力中的重點),Semaphore 確定不是爲這種特例存在的,它是共享式獲取同步狀態的一種實現。若是使用信號量,咱們一般會將 permits 設置成大於1的值,不知道你是否還記得我曾在 爲何要使用線程池? 一文中說到的池化概念,在同一時刻,容許多個線程使用鏈接池,每一個鏈接被釋放以前,不容許其餘線程使用。因此說 Semaphore 能夠容許多個線程訪問一個臨界區,最終很好的作到一個限流/限流/限流 的做用
雖然 Semaphore 能很好的提供限流做用,說實話,Semaphore 的限流做用比較單一,我在實際工做中使用 Semaphore 並非不少,若是真的要用高性能限流器,Guava RateLimiter 是一個很是不錯的選擇,咱們後面會作分析,有興趣的能夠提早了解一下
關於 Semaphore 源碼,就這麼三下五除二的結束了
不知你有沒有感受到,咱們的節奏明顯加快了,好多原來分散的點在被瘋狂的串聯起來,若是按照這個方式來閱讀 JUC 源碼,相信你也不會一頭扎進去迷失方向,而後沮喪的退出 JUC 吧,而後面試背誦答案,而後忘記,而後再背誦?
跟上節奏,關於共享式獲取同步狀態,Semaphore 只不過是很是經典的應用,ReadWriteLock 和 CountDownLatch 平常應用仍是很是普遍的,咱們接下來就陸續聊聊它們吧
日拱一兵 | 原創