精美圖文講解Java AQS 共享式獲取同步狀態以及Semaphore的應用

| 好看請贊,養成習慣html

  • 你有一個思想,我有一個思想,咱們交換後,一我的就有兩個思想
  • If you can NOT explain it simply, you do NOT understand it well enough

看到本期內容這麼少,是否是心動了呢?java

前言

上一篇萬字長文 Java AQS隊列同步器以及ReentrantLock的應用 爲咱們讀 JUC 源碼以及其設計思想作了足夠多的鋪墊,接下來的內容我將重點說明差別化,若是有些童鞋不是能很好的理解文中的一些內容,強烈建議回看上一篇文章,搞懂基礎內容,接下來的閱讀真會輕鬆加愉快node


AQS 中咱們介紹了獨佔式獲取同步狀態的多種情形:面試

  • 獨佔式獲取鎖
  • 可響應中斷的獨佔式獲取鎖
  • 有超時限制的獨佔式獲取鎖

AQS 提供的模版方法裏面還差共享式獲取同步狀態沒有介紹,因此咱們今天來揭開這個看似神祕的面紗編程

AQS 中的共享式獲取同步狀態

獨佔式是你中沒我,我中沒你的的一種互斥形式,共享式顯然就不是這樣了,因此他們的惟一區別就是:併發

同一時刻可否有多個線程同時獲取到同步狀態

簡單來講,就是這樣滴:oop

咱們知道同步狀態 state 是維護在 AQS 中的,拋開可重入鎖的概念,我在上篇文章中也提到了,獨佔式和共享式控制同步狀態 state 的區別僅僅是這樣:源碼分析

因此說想了解 AQS 的 xxxShared 的模版方法,只須要知道它是怎麼控制 state 的就行了性能

AQS共享式獲取同步狀態源碼分析

爲了幫助你們更好的回憶內容,我將上一篇文章的兩個關鍵內容粘貼在此處,幫助你們快速回憶,關於共享式,你們只須要關注【騷紫色】就能夠了ui

自定義同步器須要重寫的方法

AQS 提供的模版方法

故事就從這裏提及吧 (你會發現和獨佔式驚人的類似),關鍵代碼都加了註釋

public final void acquireShared(int arg) {
          // 一樣調用自定義同步器須要重寫的方法,非阻塞式的嘗試獲取同步狀態,若是結果小於零,則獲取同步狀態失敗
        if (tryAcquireShared(arg) < 0)
              // 調用 AQS 提供的模版方法,進入等待隊列
            doAcquireShared(arg);
    }

進入 doAcquireShared 方法:

private void doAcquireShared(int arg) {
          // 建立共享節點「SHARED」,加到等待隊列中
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
              // 進入「自旋」,這裏並非純粹意義上的死循環,在獨佔式已經說明過
            for (;;) {
                  // 一樣嘗試獲取當前節點的前驅節點
                final Node p = node.predecessor();
                  // 若是前驅節點爲頭節點,嘗試再次獲取同步狀態
                if (p == head) {
                      // 在此以非阻塞式獲取同步狀態
                    int r = tryAcquireShared(arg);
                      // 若是返回結果大於等於零,才能跳出外層循環返回
                    if (r >= 0) {
                          // 這裏是和獨佔式的區別
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

上面代碼第 18 行咱們提到和獨佔式獲取同步狀態的區別,貼心的給你們一個更直觀的對比:

差異只在這裏,因此咱們就來看看 setHeadAndPropagate(node, r) 到底幹了什麼,我以前說過 JDK 源碼中的方法命名絕大多數仍是很是直觀的,該方法直譯過來就是 【設置頭而且傳播/繁衍】。獨佔式只是設置了頭,共享式除了設置頭還多了一個傳播,你的疑問應該已經來了:

啥是傳播,爲何會有傳播這個設置呢?

想了解這個問題,你須要先知道非阻塞共享式獲取同步狀態返回值的含義:

這裏說的傳播其實說的是 propagate > 0 的狀況,道理也很簡單,當前線程獲取同步狀態成功了,還有剩餘的同步狀態可用於其餘線程獲取,那就要通知在等待隊列的線程,讓他們嘗試獲取剩餘的同步狀態

若是要讓等待隊列中的線程獲取到通知,須要線程調用 release 方法實現的。接下來,咱們走近 setHeadAndPropagate 一探究竟,驗證一下

// 入參,node: 當前節點
    // 入參,propagate:獲取同步狀態的結果值,即上面方法中的變量 r
    private void setHeadAndPropagate(Node node, int propagate) {
            // 記錄舊的頭部節點,用於下面的check
        Node h = head; 
            // 將當前節點設置爲頭節點
        setHead(node);
        
            // 經過 propagate 的值和 waitStatus 的值來判斷是否能夠調用 doReleaseShared 方法
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
              // 若是後繼節點爲空或者後繼節點爲共享類型,則進行喚醒後繼節點
                    // 這裏後繼節點爲空意思是隻剩下當前頭節點了,另外這裏的 s == null 也是判斷空指針的標準寫法
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

上面方法的大方向做用咱們瞭解了,可是代碼中什麼時候調用 doReleaseShared 的判斷邏輯仍是挺讓人費解的,爲何會有這麼一大堆的判斷,咱們來逐個分析一下:

這裏的空判斷有點讓人頭大,咱們先挑出來講明一下:

排除了其餘判斷條件的干擾,接下來咱們就專一分析 propagate 和 waitStatus 兩個判斷條件就能夠了,這裏再將 waitStatus 的幾種狀態展現在這裏,幫助你們理解,【騷粉色】是咱們一會要用到的:

propagate > 0

上面已經說過了,若是成立,直接短路後續判斷,而後根據 doReleaseShared 的判斷條件進行釋放

propagate > 0 不成立, h.waitStatus < 0 成立 (注意這裏的h是舊的頭節點)

何時 h.waitStatus < 0 呢?拋開 CONDITION 的使用,只剩下 SIGNAL 和 PROPAGATE,想知道這個答案,須要提早看一下 doReleaseShared() 方法了:

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                      // CAS 將頭節點的狀態設置爲0                
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 設置成功後才能跳出循環喚醒頭節點的下一個節點
                      unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         // 將頭節點狀態CAS設置成 PROPAGATE 狀態
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

doReleaseShared() 方法中能夠看出:

  • 若是讓 h.waitStatus < 0 成立,只能將其設置成 PROPAGATE = -3 的狀況,設置成功的前提是 h 頭節點 expected 的狀態是 0;
  • 若是 h.waitStatus = 0,是上述代碼第 8 行 CAS 設置成功,而後喚醒等待中的線程

因此猜想,當前線程執行到 h.waitStatus < 0 的判斷前,有另一個線程恰好執行了 doReleaseShared() 方法,將 waitStatus 又設置成PROPAGATE = -3

這個理解有點繞,咱們仍是來畫個圖理解一下吧:

可能有同窗仍是不太能理解這麼寫的道理,咱們一直說 propagate <> = 0 的狀況,propagate = 0 表明的是當時/當時/當時 嘗試獲取同步狀態沒成功,可是以後可能又有共享狀態被釋放了,因此上面的邏輯是以防這種萬一,你懂的,嚴謹的併發就是要防止一切萬一,如今結合這個情景再來理解上面的判斷你是否豁然開朗了呢?

繼續向下看,

前序條件不成立,(h = head) == null || h.waitStatus < 0 注意這裏的h是新的頭節點)

有了上面鋪墊,這個就直接畫個圖就更好理解啦,其實就是沒有那麼巧有另一個線程摻合了

相信到這裏你應該理解共享式獲取同步狀態的所有過程了吧,至於非阻塞共享式獲取同步狀態帶有超時時間獲取同步狀態,結合本文講的 setHeadAndPropagate 邏輯和獨佔式獲取同步狀態的實現過程過程來看,真是一毛同樣,這裏就再也不累述了,趕忙打開你的 IDE 去驗證一下吧

咱們分析了AQS 的模版方法,還一直沒說 tryAcquireShared(arg) 這個方法是如何被重寫的,想要了解這個,咱們就來看一看共享式獲取同步狀態的經典應用 Semaphore

Semaphore 的應用及源碼分析

Semaphore 概念

Semaphore 中文多翻譯爲 【信號量】,我還特地查了一下劍橋辭典的英文解釋:

其實就是信號標誌(two flags),好比紅綠燈,每一個交通燈產生兩種不一樣行爲

  • Flag1-紅燈:停車
  • Flag2-綠燈:行車

在 Semaphore 裏面,何時是紅燈,何時是綠燈,其實就是靠 tryAcquireShared(arg) 的結果來表示的

  • 獲取不到共享狀態,即爲紅燈
  • 獲取到共享狀態,即爲綠燈

因此咱們走近 Semaphore ,來看看它究竟是怎麼應用 AQS 的,又是怎樣重寫 tryAcquireShared(arg) 方法的

Semaphore 源碼分析

先看一下類結構

看到這裏你是否有點跌眼鏡,和 ReentrantLock 類似的可怕吧,若是你有些陌生,再次強烈建議你回看上一篇文章 Java AQS隊列同步器以及ReentrantLock的應用 ,這裏直接提速對比看公平和非公平兩種重寫的 tryAcquireShared(arg) 方法,沒有意外,公平與否,就是判斷是否有前驅節點

方法內部只是計算 state 的剩餘值,那 state 的初始值是多少怎麼設置呢?固然也就是構造方法了:

public Semaphore(int permits) {
          // 默認還是非公平的同步器,至於爲何默認是非公平的,在上一篇文章中也特地說明過
        sync = new NonfairSync(permits);
    }
    
    NonfairSync(int permits) {
            super(permits);
    }

super 方法,就會將初始值給到 AQS 中的 state

也許你發現了,當咱們把 permits 設置爲1 的時候,不就是 ReentrantLock 的互斥鎖了嘛,說的一點也沒錯,咱們用 Semaphore 也能實現基本互斥鎖的效果

static int count;
//初始化信號量
static final Semaphore s 
    = new Semaphore(1);
//用信號量保證互斥    
static void addOne() {
  s.acquire();
  try {
    count+=1;
  } finally {
    s.release();
  }
}

But(英文聽力中的重點),Semaphore 確定不是爲這種特例存在的,它是共享式獲取同步狀態的一種實現。若是使用信號量,咱們一般會將 permits 設置成大於1的值,不知道你是否還記得我曾在 爲何要使用線程池? 一文中說到的池化概念,在同一時刻,容許多個線程使用鏈接池,每一個鏈接被釋放以前,不容許其餘線程使用。因此說 Semaphore 能夠容許多個線程訪問一個臨界區,最終很好的作到一個限流/限流/限流 的做用

雖然 Semaphore 能很好的提供限流做用,說實話,Semaphore 的限流做用比較單一,我在實際工做中使用 Semaphore 並非不少,若是真的要用高性能限流器,Guava RateLimiter 是一個很是不錯的選擇,咱們後面會作分析,有興趣的能夠提早了解一下

關於 Semaphore 源碼,就這麼三下五除二的結束了

總結

不知你有沒有感受到,咱們的節奏明顯加快了,好多原來分散的點在被瘋狂的串聯起來,若是按照這個方式來閱讀 JUC 源碼,相信你也不會一頭扎進去迷失方向,而後沮喪的退出 JUC 吧,而後面試背誦答案,而後忘記,而後再背誦?

跟上節奏,關於共享式獲取同步狀態,Semaphore 只不過是很是經典的應用,ReadWriteLock 和 CountDownLatch 平常應用仍是很是普遍的,咱們接下來就陸續聊聊它們吧

靈魂追問

  1. Semaphore 的 permits 設置成1 「等同於」 簡單的互斥鎖實現,那它和 ReentrantLock 的區別仍是挺大的,都有哪些區別呢?
  2. 你在項目中是如何使用 Semaphore 的呢?

參考

  1. Java 併發實戰
  2. Java 併發編程的藝術
  3. https://blog.csdn.net/anlian5...

日拱一兵 | 原創

相關文章
相關標籤/搜索