原理剖析(第 006 篇)Semaphore工做原理分析

原理剖析(第 006 篇)Semaphore工做原理分析

1、大體介紹

一、在前面章節瞭解了CAS、AQS後,想必你們已經對這塊知識有了深入的瞭解了;
二、而JDK中有一個關於信號量的工具類,它也是基於AQS實現的,能夠認爲是synchronized的升級版(結尾處會講解到);
三、那麼本章節就和你們分享分析一下JDK1.8的Semaphore的工做原理; 

2、簡單認識Semaphore

2.1 何爲Semaphore?

一、Semaphore顧名思義,叫信號量;

二、Semaphore可用來控制同時訪問特定資源的線程數量,以此來達到協調線程工做;

三、Semaphore內部也有公平鎖、非公平鎖的靜態內部類,就像ReentrantLock同樣,Semaphore內部基本上是經過sync.xxx之類的這種調用方式的;

四、Semaphore內部維護了一個虛擬的資源池,若是許可爲0則線程阻塞等待,直到許可大於0時又能夠有機會獲取許可了;

2.2 Semaphore的state關鍵詞

一、其實Semaphore的實現也偏偏很好利用了其父類AQS的state變量值;

二、初始化一個數量值做爲許可池的資源,假設爲N,那麼當任何線程獲取到資源時,許可減1,直到許可爲0時後續來的線程就須要等待;

三、Semaphore,簡單大體意思爲:A、B、C、D線程同時爭搶資源,目前卡槽大小爲2,若A、B正在執行且未執行完,那麼C、D線程在門外等着,一旦A、B有1個執行完了,那麼C、D就會競爭看誰先執行;
   state初始值假設爲N,後續每tryAcquire()一次,state會CAS減1,當state爲0時其它線程處於等待狀態,
   直到state>0且<N後,進程又能夠獲取到鎖進行各自操做了;

2.3 經常使用重要的方法

一、public Semaphore(int permits)
   // 建立一個給定許可數量的信號量對象,且默認以非公平鎖方式獲取資源

二、public Semaphore(int permits, boolean fair)
   // 建立一個給定許可數量的信號量對象,且是否公平方式由傳入的fair布爾參數值決定

三、public void acquire() 
   // 今後信號量獲取一個許可,當許可數量小於零時,則阻塞等待
   
四、public void acquire(int permits)  
   // 今後信號量獲取permits個許可,當許可數量小於零時,則阻塞等待,可是當阻塞等待的線程被喚醒後發現被中斷過的話則會拋InterruptedException異常
   
五、public void acquireUninterruptibly(int permits)   
   // 今後信號量獲取permits個許可,當許可數量小於零時,則阻塞等待,可是當阻塞等待的線程被喚醒後發現被中斷過的話不會拋InterruptedException異常
   
六、public void release()
   // 釋放一個許可

七、public void acquire(int permits)
   // 釋放permits個許可

2.4 設計與實現僞代碼

一、獲取共享鎖:
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

	acquire{
		若是檢測中斷狀態發現被中斷過的話,那麼則拋出InterruptedException異常
		若是嘗試獲取共享鎖失敗的話( 嘗試獲取共享鎖的各類方式由AQS的子類實現 ),
		那麼就新增共享鎖結點經過自旋操做加入到隊列中,而且根據結點中的waitStatus來決定是否調用LockSupport.park進行休息
	}
	
	
二、釋放共享鎖:
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
	
	release{
		若是嘗試釋放共享鎖失敗的話( 嘗試釋放共享鎖的各類方式由AQS的子類實現 ),
		那麼經過自旋操做喚完成阻塞線程的喚起操做
	}

2.五、Semaphore生活細節化理解

好比咱們每天在外面吃快餐,我就以吃快餐爲例生活化闡述該Semaphore原理:

一、場景:餐廳只有一個排隊的走廊,只有十個打飯窗口;

二、開飯時間點,剛開始的時候,人數很少,屈指可數,窗口不少,打飯菜天然很快,但隨着時間的推移人數會愈來愈多,會呈現阻塞擁擠情況,排起了慢慢長隊;

三、人數愈來愈多,但窗口只有十個,後來的就只好按先來後到排隊等待打飯菜咯,前面窗口空缺一個,排隊最前的一個則上去打飯菜,秩序有條不紊;

四、總之你們都挨個挨個排隊打飯,先來後到,相安無事的順序打飯菜;

五、到此打止,一、二、三、4能夠認爲是一種公平方式的信號量共享鎖;

六、可是呢,還有那麼些緊急趕時間的人,來餐廳時恰好看到師傅剛剛打完一我的的飯菜,因而插入去打飯菜敢時間;

七、若是敢時間人的來的時候發現師傅還在打飯菜,那麼就只得乖乖的排隊等候打飯菜咯;

八、到此打止,一、二、六、7能夠認爲是一種非公平方式的信號量共享鎖;

3、源碼分析Semaphore

3.一、Semaphore構造器

一、構造器源碼:
    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and nonfair fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and the given fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     * @param fair {@code true} if this semaphore will guarantee
     *        first-in first-out granting of permits under contention,
     *        else {@code false}
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
	
二、建立一個給定許可數量的信號量對象,默認使用非公平鎖,固然也可經過fair布爾參數值決定是公平鎖仍是非公平鎖;

3.二、Sync同步器

一、AQS --> Sync ---> FairSync // 公平鎖
				  |
				  |> NonfairSync // 非公平鎖
				  
二、Semaphore內的同步器都是經過Sync抽象接口來操做調用關係的,細看會發現基本上都是經過sync.xxx之類的這種調用方式的;

3.三、acquire()

一、源碼:
    /**
     * Acquires a permit from this semaphore, blocking until one is
     * available, or the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>Acquires a permit, if one is available and returns immediately,
     * reducing the number of available permits by one.
     *
     * <p>If no permit is available then the current thread becomes
     * disabled for thread scheduling purposes and lies dormant until
     * one of two things happens:
     * <ul>
     * <li>Some other thread invokes the {@link #release} method for this
     * semaphore and the current thread is next to be assigned a permit; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * for a permit,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1); // 調用父類AQS中的獲取共享鎖資源的方法
    }
	
二、acquire是信號量獲取共享資源的入口,嘗試獲取鎖資源,獲取到了則立馬返回並跳出該方法,沒有獲取到則該方法阻塞等待;
   其主要也是調用sync的父類AQS的acquireSharedInterruptibly方法;

3.四、acquireSharedInterruptibly(int)

一、源碼:
    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) // 調用以前先檢測該線程中斷標誌位,檢測該線程在以前是否被中斷過
            throw new InterruptedException(); // 若被中斷過的話,則拋出中斷異常
        if (tryAcquireShared(arg) < 0) // 嘗試獲取共享資源鎖,小於0則獲取失敗,此方法由AQS的具體子類實現
            doAcquireSharedInterruptibly(arg); // 將嘗試獲取鎖資源的線程進行入隊操做
    }
	
二、acquireSharedInterruptibly是共享模式下線程獲取鎖資源的基類方法,每當線程獲取到一次共享資源,則共享資源數值就會作減法操做,直到共享資源值小於0時,則線程阻塞進入隊列等待;

三、並且該線程支持中斷,也正如方法名稱所意,當該方法檢測到中斷後則立馬會拋出中斷異常,讓調用該方法的地方立馬感知線程中斷狀況;

3.五、tryAcquireShared(int)

一、公平鎖tryAcquireShared源碼:
	// FairSync 公平鎖的 tryAcquireShared 方法
	protected int tryAcquireShared(int acquires) {
		for (;;) { // 自旋的死循環操做方式
			if (hasQueuedPredecessors()) // 檢查線程是否有阻塞隊列
				return -1; // 若是有阻塞隊列,說明共享資源的許可數量已經用完,返回-1乖乖進行入隊操做
			int available = getState(); // 獲取鎖資源的最新內存值
			int remaining = available - acquires; // 計算獲得剩下的許可數量
			if (remaining < 0 || // 若剩下的許可數量小於0,說明已經共享資源了,返回負數而後乖乖進入入隊操做
				compareAndSetState(available, remaining)) // 若共享資源大於或等於0,防止併發則經過CAS操做佔據最後一個共享資源
				return remaining; // 無論獲得remaining後進入了何種邏輯,操做了以後再將remaining返回,上層會根據remaining的值進行判斷是否須要入隊操做
		}
	}

二、非公平鎖tryAcquireShared源碼:	
	// NonfairSync 非公平鎖的 tryAcquireShared 方法
	protected int tryAcquireShared(int acquires) {
		return nonfairTryAcquireShared(acquires); // 
	}

	// NonfairSync 非公平鎖父類 Sync 類的 nonfairTryAcquireShared 方法	
	final int nonfairTryAcquireShared(int acquires) {
		for (;;) { // 自旋的死循環操做方式
			int available = getState(); // 獲取鎖資源的最新內存值
			int remaining = available - acquires; // 計算獲得剩下的許可數量
			if (remaining < 0 || // 若剩下的許可數量小於0,說明已經共享資源了,返回負數而後乖乖進入入隊操做
				compareAndSetState(available, remaining)) // 若共享資源大於或等於0,防止併發則經過CAS操做佔據最後一個共享資源
				return remaining; // 無論獲得remaining後進入了何種邏輯,操做了以後再將remaining返回,上層會根據remaining的值進行判斷是否須要入隊操做
		}
	}		
	
三、tryAcquireShared法是AQS的子類實現的,也就是Semaphore的兩個靜態內部類實現的,目的就是經過CAS嘗試獲取共享鎖資源,
   獲取共享鎖資源成功大於或等於0的天然數,獲取共享鎖資源失敗則返回負數;

3.六、doAcquireSharedInterruptibly(int)

一、源碼:
    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
		// 按照給定的mode模式建立新的結點,模式有兩種:Node.EXCLUSIVE獨佔模式、Node.SHARED共享模式;
        final Node node = addWaiter(Node.SHARED); // 建立共享模式的結點
        boolean failed = true;
        try {
            for (;;) { // 自旋的死循環操做方式
                final Node p = node.predecessor(); // 獲取結點的前驅結點
                if (p == head) { // 若前驅結點爲head的話,那麼說明當前結點天然不用說了,僅次於老大以後的即是老二了咯
                    int r = tryAcquireShared(arg); // 並且老二也但願嘗試去獲取一下鎖,萬一頭結點恰巧剛剛釋放呢?但願仍是要有的,萬一實現了呢。。。
                    if (r >= 0) { // 若r>=0,說明已經成功的獲取到了共享鎖資源
                        setHeadAndPropagate(node, r); // 把當前node結點設置爲頭結點,而且調用doReleaseShared釋放一下無用的結點
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && // 根據前驅結點看看是否須要休息一下子
                    parkAndCheckInterrupt()) // 阻塞操做,正常狀況下,獲取不到共享鎖,代碼就在該方法中止了,直到被喚醒
					// 被喚醒後,發現parkAndCheckInterrupt()裏面檢測了被中斷了的話,則補上中斷異常,所以拋了個異常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
	
二、doAcquireSharedInterruptibly也是採用一個自旋的死循環操做方式,直到正常返回或者被喚醒拋出中斷異常爲止;

3.七、release()

一、源碼:
    /**
     * Releases a permit, returning it to the semaphore.
     *
     * <p>Releases a permit, increasing the number of available permits by
     * one.  If any threads are trying to acquire a permit, then one is
     * selected and given the permit that was just released.  That thread
     * is (re)enabled for thread scheduling purposes.
     *
     * <p>There is no requirement that a thread that releases a permit must
     * have acquired that permit by calling {@link #acquire}.
     * Correct usage of a semaphore is established by programming convention
     * in the application.
     */
    public void release() {
        sync.releaseShared(1); // 釋放一個許可資源 
    }
	
二、該方法是調用其父類AQS的一個釋放共享資源的基類方法;

3.八、releaseShared(int)

一、源碼:
    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 嘗試釋放共享鎖資源,此方法由AQS的具體子類實現
            doReleaseShared(); // 自旋操做,喚醒後繼結點
            return true;
        }
        return false;
    }
	
二、releaseShared主要是進行共享鎖資源釋放,若是釋放成功則喚醒隊列等待的結點,若是失敗則返回false,由上層調用方決定如何處理;

3.九、tryReleaseShared(int)

一、源碼:
	// NonfairSync 和 FairSync 的父類 Sync 類的 tryReleaseShared 方法	
	protected final boolean tryReleaseShared(int releases) {
		for (;;) { // 自旋的死循環操做方式
			int current = getState(); // 獲取最新的共享鎖資源值
			int next = current + releases; // 對許可數量進行加法操做
			// int類型值小於0,是由於該int類型的state狀態值溢出了,溢出了的話那得說明這個鎖有多難釋放啊,可能出問題了
			if (next < current) // overflow
				throw new Error("Maximum permit count exceeded");
			if (compareAndSetState(current, next)) // 
				return true; // 返回成功標誌,告訴上層該線程已經釋放了共享鎖資源
		}
	}
	
二、tryReleaseShared主要經過CAS操做對state鎖資源進行加法操做,騰出多餘的共享鎖資源供其它線程競爭;

3.十、doReleaseShared()

一、源碼:
    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) { // 自旋的死循環操做方式
            Node h = head; // 每次都是取出隊列的頭結點
            if (h != null && h != tail) { // 若頭結點不爲空且也不是隊尾結點
                int ws = h.waitStatus; // 那麼則獲取頭結點的waitStatus狀態值
                if (ws == Node.SIGNAL) { // 若頭結點是SIGNAL狀態則意味着頭結點的後繼結點須要被喚醒了
					// 經過CAS嘗試設置頭結點的狀態爲空狀態,失敗的話,則繼續循環,由於併發有可能其它地方也在進行釋放操做
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); // 喚醒頭結點的後繼結點
                }
				// 如頭結點爲空狀態,則把其改成PROPAGATE狀態,失敗的則多是由於併發而被改動過,則再次循環處理
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
			// 若頭結點沒有發生什麼變化,則說明上述設置已經完成,大功告成,功成身退
			// 若發生了變化,多是操做過程當中頭結點有了新增或者啥的,那麼則必須進行重試,以保證喚醒動做能夠延續傳遞
            if (h == head)                   // loop if head changed
                break;
        }
    }
	
二、doReleaseShared主要是釋放共享許可,可是最重要的目的仍是保證喚醒後繼結點的傳遞,來讓這些線程釋放他們所持有的信號量;

4、總結

一、在分析了AQS以後,再來分析Semaphore是否是變得比較簡單了;

二、在這裏我簡要總結一下Semaphore的流程的一些特性:
	• 管理一系列許可證,即state共享資源值;
	• 每acquire一次則state就減1一次,直到許可證數量小於0則阻塞等待;
	• 釋放許可的時候要保證喚醒後繼結點,以此來保證線程釋放他們所持有的信號量;
	• 是Synchronized的升級版,由於Synchronized是隻有一個許可,而Semaphore就像開了掛同樣,能夠有多個許可;

5、下載地址

https://gitee.com/ylimhhmily/SpringCloudTutorial.gitnode

SpringCloudTutorial交流QQ羣: 235322432git

SpringCloudTutorial交流微信羣: 微信溝通羣二維碼圖片連接微信

歡迎關注,您的確定是對我最大的支持!!!併發

相關文章
相關標籤/搜索