Java併發編程:8-Semaphore & AQS

前言:java

Lock & Condition中咱們學習了管程這種併發編程模型,在管程模型提出以前,信號量模型一直是併發編程領域的終結者,幾乎全部支持併發編程的語言都支持信號量機制,今天就來看看Java中的信號量實現--Semaphore。node

面試問題
Q :談談對Semaphore的理解?面試

ps:代碼中的註釋較多,建議瀏覽器縮放125%閱讀。

1.信號量模型

在正式開始前,咱們先簡單回顧一下管程模型,管程模型中對共享變量互斥訪問,只能有一個線程成功進入臨界區,其餘嘗試失敗的線程會在臨界區外等待區的入口等待隊列中等待,進入臨界區中的線程,若是須要等待某個條件變量,則會釋放鎖,喚醒入口等待隊列中等待的線程,同時在該條件變量對應的等待隊列中等待,線程也會進入等待狀態,直到被其餘線程喚醒,纔會從條件變量的等待隊列中移除,加入到入口等待隊列中從新嘗試進入臨界區。 編程

信號量與管程最大的不一樣:信號量能夠容許多個線程訪問同一個臨界區,而管程只容許一個線程訪問臨界區。segmentfault

信號量模型能夠簡單歸納:一個計數器、一個等待隊列,三個方法。在信號量模型裏,計數器和等待隊列對外是透明的,只能經過信號量模型提供的方法來訪問。瀏覽器

init(int permits):設置計數器的初始值,最多容許多少個線程同時訪問臨界區。併發

down(int permits):計數器的值減去permits個許可,若是計數器中暫時沒有足夠的許可,則將當前線程阻塞,並加入到等待隊列。高併發

up(int permits):計數器的值加上permits個許可,並根據歸還後許可的個數,喚醒等待隊列中一個線程,工具

舉個簡單的例子,在哲學家用餐問題中,咱們能夠經過破壞請求與保持條件,一次性申請全部的資源來解決死鎖問題,具體的操做就是把筷子都放桌子中央,須要吃飯的人一次拿兩根。筷子不夠兩根的話,進行等待。oop

桌子中央的5根筷子就至關於信號量中的5個許可,在Init()時設置,當某個哲學家打算用餐時,則經過down(2),拿走兩根筷子(許可),用餐完畢後up(2)歸還筷子,若是中間只剩一根筷子,那麼再執行down(2)並不會拿走剩下的這一根筷子,而是會進入等待隊列,當有筷子被歸還時,先排隊的人會嘗試去拿筷子,若是此時沒有其餘哲學家競爭的話,就能夠拿到筷子進行用餐,用餐結束後,歸還筷子,並通知排在他後邊的人能夠去拿了。

2.Semaphore使用

申請多少信號量,記得釋放多少信號量。

一旦(one egg) 進入等待隊列中,只有前驅節點釋放或取消後繼纔會被喚醒。

舉個例子:

線程A和線程B分別須要10個和5個許可,信號量一開始只剩4個許可,A先申請,申請不到,掛起A,B後申請,也申請不到,掛起B,注意,等待隊列中A是排在B前邊的。過了一會有線程歸還了1個許可,此時信號量中有5個空閒許可,而線程B也恰好須要5個。那麼是否是線程B就能夠被喚醒了?不是的,B線程只能被其前驅節點喚醒,在被喚醒前是出於掛起(等待)狀態的,對許可的個數變化是不知情的。

構造時傳入的許可數,並不表明信號量最大支持的許可數

構造傳入的許可數,表明的AQS先幫你指定初始的數量,你在後邊的使用中還能夠繼續經過release繼續往上加,只要不超過int最大值均可以。因此記得用多少,還多少。少還的話,其餘線程不夠用,多還則程序會拋出錯誤。

Semaphore(1)是否是能夠當ReentrantLock使用

看着功能挺像的,但內部實現徹底不一樣,一個是共享模式,一個是獨佔模式,建議不要這樣作。

3.Semaphore解析

Semaphorey底層依舊是經過AQS實現,其靜態內部抽象類Sync實現類AQS中共享模式的主要方法,FairSync與NonfairSync繼承自Sync,各自經過重寫tryAcquireShared分別實現了公平模式與非公平模式。

公平模式:在申請相同數量許可的前提下,調用acquire的順序就是獲取許可的順序;若是申請許可數量不一樣,那麼信號量會根據等待隊列中的順序,優先知足申請數量小於等於空閒數量的線程。

非公平模式:在進入等待隊列前嘗試去獲取許可,剛好此時有一個許可釋放,並被該線程申請到,那麼就不用進入等待隊列了。

Semaphore 對應的兩個構造方法以下:

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

在構造的時候必填permits,做爲信號量中許可的初始化個數。fair選填,默認非公平模式。

3.1 申請許可

在以前學習中,咱們更多關注的是工具類實現AQS的部分,沒有從總體的角度來學習它,在後邊的內容中,咱們試着去了解AQS是如何設計的,是怎樣經過簡單的實現就能夠自定義同步組件。

在咱們調用 semaphore.acquire()後,Semaphore調用的是內部的 sync.acquireSharedInterruptibly(1) 若是調用的是 acquire(int permits) 則會調用 sync.acquireSharedInterruptibly(permits)

acquireSharedInterruptibly()定義在AQS中,獲取可響應中斷的共享,代碼以下:

//AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //若是線程被中斷了,拋出中斷異常
        if (Thread.interrupted())
            throw new InterruptedException();
        /*
        下面這個方法名是否是很眼熟,在上一篇講ReadLock部分時主要介紹的就是這個方法,
        但二者的內容是徹底不一樣的,各個工具類經過實現各自的tryAcquireShared來提供不一樣的功能。
        tryAcquireShared方法表示嘗試去獲取,能成功是運氣好,失敗纔是常態。
        若是返回值<0,則嘗試獲取許可失敗,執行doAcquireSharedInterruptibly(arg);
        */
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared()在Semaphore中有公平模式和非公平模式兩種實現。

//非公平模式        
        final int nonfairTryAcquireShared(int acquires) {//要申請的許可個數
            for (;;) {
                //目前信號量中空閒的許可個數。
                int available = getState();
                //remaining表示通過當前操做後剩下空閒許可的個數,
                //remaining >= 0,表示能夠知足當前線程申請的許可數,申請成功
                //remaining < 0,沒法知足當前線程申請的許可數,申請失敗
                int remaining = available - acquires;
                if (remaining < 0 ||
                    //AQS中的volatile變量state在不一樣的工具類中有不一樣的含義,
                    //在Semaphore中表示剩餘的信號量。
                    compareAndSetState(available, remaining))
                    //返回剩餘的許可
                    return remaining;
            }
        }
//公平模式        
        protected int tryAcquireShared(int acquires) {
            for (;;) 
                /*
                公平模式與非公平模式一區別在於下邊這行代碼。
                公平就是前邊有線程在等待的話,當前線程須要排隊。
                hasQueuedPredecessors()用來判斷是否須要排隊
                */
                if (hasQueuedPredecessors())
                //若是須要排隊,爲了保證公平性,不進行嘗試獲取,直接返回
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
//AbstractQueuedSynchronizer    
    public final boolean hasQueuedPredecessors() {
        /*
        檢查信號量模型圖中的等待隊列,首節點是不是當前線程。
             ____          ____          ____
      head  | \\ |  -->   | t1 |  -->   | t2 |    tail
            |____|  <--   |____|  <--   |____|
            頭節點             首節點
        */
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

至此acquireSharedInterruptibly( arg)中的tryAcquireShared(arg) 方法執行完成,返回值大於等於0,可表示嘗試成功;小於0則嘗試失敗,會進入doAcquireSharedInterruptibly(arg)方法。

//AbstractQueuedSynchronizer   
    /*
    該方法是AQS中嘗試獲取共享失敗後的處理方法,上一篇中的ReadLock嘗試
    獲取讀鎖失敗後,也會執行該方法中定義的邏輯,並且還會作額外的中間檢查。
    */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //把當前線程封裝爲共享類型的節點添加至等待隊列,
        //該方法的具體操做見 代碼塊-1
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //返回當前線程節點的前一個節點,代碼塊-3
                final Node p = node.predecessor();
                //只要p==head時,也就是當前線程節點爲等待隊列中的首節點,則能夠嘗試獲取共享狀態
                if (p == head) {
                    //這裏再次去嘗試獲取arg個共享狀態,也就是arg個許可
                    int r = tryAcquireShared(arg);
                    //CAS已經成功修改了信號量,獲取到了須要的許可數
                    if (r >= 0) {
                        /*
                        當前線程節點目前處於等待隊列中的首節點,獲取後則須要讓出首節點的位置,
                        其餘線程成爲首節點,才能進行嘗試獲取共享狀態的操做。
                        setHeadAndPropagate()即是如何將當前線程節點的下一個節點
                        變成首節點的方法,而且若是後繼節點是共享類型,還會喚醒後繼節點方法
                        具體內容在代碼塊-4。
                        */
setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }

                /*
                代碼塊-7,從最開始的嘗試獲取到添加節點後判斷是否爲頭節點,
                兩次嘗試都失敗,則會在此處判斷當前線程是否應該被掛起(能夠理解爲進入等待)
                在此處會將前驅節點的waitStatus設置爲SIGNAL,表示當前節點須要被喚醒
                而具體的喚醒方法是由前驅節點來調用的,能夠理解爲,在排隊時你告訴你前邊的人
                讓他買完後叫你一下(設置前驅節點的waitStatus爲SIGNAL),
                這樣你就能夠低頭玩手機了(掛起線程)。
                   */
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //代碼塊-8,掛起當前線程
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

代碼塊-1:

//AbstractQueuedSynchronizer 
    //向等待隊列尾部中添加節點,在
    private Node addWaiter(Node mode) {//前邊傳過來的節點類型 Node.SHARED
        //將當前線程封裝爲共享類型的節點。
        Node node = new Node(Thread.currentThread(), mode);
        //pred表示前驅節點,指向當前隊列的尾節點。
        Node pred = tail;
        //當前隊列尾節點不爲空,表示當前隊列不爲空。
        if (pred != null) {
            //將當前線程節點的前驅節點指向尾節點。
            //高併發時,多個線程節點的前驅指向同一個尾節點,但最後CAS只能成功一個。
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                //CAS成功設置尾節點後,纔將pred節點的後繼指向肯定後的尾節點。
                pred.next = node;
                return node;
            }
        }
        //前邊隊列尾節點爲空或CAS失敗後會執行該方法。
        enq(node);
        return node;
    }

代碼塊-2:

//AbstractQueuedSynchronizer 
    //自旋向隊列末尾添加節點,若是隊列爲空則初始化等待隊列。
    private Node enq(final Node node) {
        //這裏是一個CAS自旋操做
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //初始化頭節點
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

代碼塊-3:

//AbstractQueuedSynchronizer的靜態內部Node
    //返回當前線程節點的前一個節點
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

代碼塊-4:

//AbstractQueuedSynchronizer
    //方法名稱直譯爲設置頭節點和傳播
    //參數propagate是獲取成功後剩餘的許可,node則是獲取的許可的線程
    private void setHeadAndPropagate(Node node, int propagate) {
        //下邊的檢查須要用到舊的頭節點
        Node h = head;  
        //設置結果以下圖
        setHead(node);
        /*      h               head
             ____          ____          ____
            | \\ |  -->   | t1 |  -->   | t2 |    tail
            |___ |  <--   |____|  <--   |___ |
                              頭節點           首節點

         * 這裏是喚醒後續節點的健壯性判斷。  
         * 在JDK 6u11,6u17中,只判斷了propagate > 0 && node.waitStatus != 0
         * 會致使併發釋放信號量所致使部分請求信號量的線程沒法被喚醒的問題,
         * 詳見 BUG – JDK-6801020 
         * 僅用propagate > 0 判斷是否喚醒後續節點是不充分的
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //後續節點爲共享類型,則喚醒該節點
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

代碼塊-5:

//AbstractQueuedSynchronizer
    //該方法用於在 acquires/releases 存在競爭的狀況下,確保喚醒動做向後傳播
    private void doReleaseShared() {
          /*
         * 下面的循環在 head 節點存在後繼節點的狀況下,作了兩件事情:
         * 1. 若是 head 節點等待狀態爲 SIGNAL,則將 head 節點狀態設爲 0,並喚醒後繼節點
         * 2. 若是 head 節點等待狀態爲 0,則將 head 節點狀態設爲 PROPAGATE,保證喚醒可以正
         *    常傳播下去。
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //代碼塊-6 喚醒後續節點
                    unparkSuccessor(h);
                }
/* 
             * ws = 0 的狀況下,這裏要嘗試將狀態從 0 設爲 PROPAGATE,保證喚醒向後
             * 傳播。setHeadAndPropagate 在讀到 h.waitStatus < 0 時,能夠繼續喚醒
             * 後面的節點。
             */
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

代碼塊-6:

//AbstractQueuedSynchronizer
    //喚醒後續節點
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * 找到下一個沒有被取消的節點喚醒
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

代碼塊-7:

//AbstractQueuedSynchronizer
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        /*
         * static final int CANCELLED =  1;
           * static final int SIGNAL    = -1;
         * static final int CONDITION = -2;
         * static final int PROPAGATE = -3;

         * 線程節點在被建立的時候,waitStatus默認爲0
         * 因此第一次進人該方法必定會返回false,在返回前設置爲SIGNAL
         * 下次再進入該方法的時候纔會返回true
         */
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //返回true,後續會掛起線程
            return true;
        if (ws > 0) {
            //若是前驅節點取消了,那麼會一直找到前邊沒被取消的節點
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
/*
             * 只有waitStatus爲0時或者爲-3時,才能到這裏。
             * 第一次進入該方法時,調用方須要再確認一下,若是仍是獲取不到再掛起。
             * 返回false,從新回到doAcquireSharedInterruptibly中自旋,
             * 再次看看本身是否能夠獲取,獲取不到會再次進入該方法。
             * 下次進入時,waitStatus就等於SIGNAL了,該方法就能夠返回true
             * 線程纔會被真正掛起,進入等待。
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

代碼塊-8:

//AbstractQueuedSynchronizer
    private final boolean parkAndCheckInterrupt() {
         //調用LockSupport掛起當前線程
        LockSupport.park(this);
        //被喚醒後會返回等待中是否被中斷         
        return Thread.interrupted();
    }

3.2 釋放許可

跟前邊獲取許可的代碼相比,釋放許可就很輕鬆了。

//AbstractQueuedSynchronizer
    public final boolean releaseShared(int arg) {
        //嘗試去釋放許可
        if (tryReleaseShared(arg)) {
            //代碼塊-5,
            doReleaseShared();
            return true;
        }
        return false;
    }
//Semaphore
    //嘗試獲取的時候是嘗試一次,嘗試釋放的時候是不停的嘗試直到成功
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }

3.3 小結

非公平模式調用獲取許可方法,先CAS嘗試。嘗試失敗後進入doAcquireSharedInterruptibly()

在該方法中,首先會向等待隊列的隊尾添加新的共享類型節點,新節點waitStatus爲0。

檢查是否爲首節點,是的話再嘗試一次,嘗試失敗或不是首節點,執行shouldParkAfterFailedAcquire()

該方法中會設置前驅節點的等待狀態爲SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL)

若是前驅節點waitStatus爲-2(取消狀態),則會從後向前遍歷找到未取消的前驅節點 ,而後再設置爲SIGNAL。

設置成功後,就能夠安心掛起了,反正有人通知。

被喚醒後先檢查中斷狀態,若是未被中斷則會回到doAcquireSharedInterruptibly()中的自旋操做。

繼續檢查是否爲首節點,不是的話繼續掛起,是首節點,而且嘗試成功後,返會剩餘許可數int r = tryAcquireShared(arg)

共享型節點在喚醒後還須要將這個喚醒操做傳遞給後繼結點,這也是與獨佔型節點的區別。

獨佔模式在此處只須要從新設置head節點,共享模式則在設置head節點的同時還要向後繼傳播喚醒。

setHeadAndPropagate(),將當前線程節點設置爲head節點,若是出現如下狀況則會向後傳播喚醒:

  • 還有剩餘的許可。
  • 舊的head節點爲空或者waitStatus小於0,也就是SIGNAL和PROPAGATE這兩種狀態。
  • 新的head節點也就是當前線程節點的爲空或者waitStatus小於0。

具體的傳播動做定義在doReleaseShared()中。這個方法其實也是釋放許可所使用的核心方法。

在該方法中會在head節點存在後繼節點的狀況下,作兩件事:

  1. 若是 head 節點等待狀態爲 SIGNAL,則將 head 節點狀態設爲 0,並喚醒後繼節點
  2. 若是 head 節點等待狀態爲 0,則將 head 節點狀態設爲 PROPAGATE,保證喚醒可以正
    常傳播下去。

喚醒後續節點後,doAcquireSharedInterruptibly()主要流程就完了。

非公平模式調用釋放許可方法,會進入releaseShared(),嘗試釋放鎖直到成功。

後續的執行前邊提到的doReleaseShared()

4.總結

這篇文章把共享鎖的整個流程走了一遍,其中有不少實現的細節有待深刻,好比設置state時何時須要用CAS,何時不需用;在添加節點時,爲何先將當前節點指向尾節點,等CAS修改爲功後,再將尾節點指向當前節點,還有等待隊列爲何是從後向前遍歷等等。

限於本人的能力,這些實現細節尚未更深刻的理解,目前只能看懂大概流程。後邊會專門寫一篇AQS的文章,來對J.U.C中的工具類作一個總結,也這些細節也進行一一剖析。

附上學習AQS的一點心得:

第一階段:先熟悉一下各個方法大概都是作什麼的,內心有個底。

第二階段:Debug!一邊跟方法,一邊要記住等待隊列中的各個節點的waitStatus以及head和tail的指向的變化,最好能畫成圖,熟悉AQS流程和waitStatus的變化。

第三階段:熟悉了總體流程後,經過線程斷點控制線程的執行流程,其實就是人工模擬CPU切換線程,使線程走到AQS中以前沒有經過的判斷邏輯中,看看會發生什麼,好比setHeadAndPropagate()方法中那一長串的判斷。

第四階段:向 Doug Lea 致敬 !

Reference

  http://www.tianxiaobo.com

感謝閱讀

相關文章
相關標籤/搜索