一、在前面章節瞭解了CAS、AQS後,想必你們已經對這塊知識有了深入的瞭解了; 二、而JDK中有一個關於信號量的工具類,它也是基於AQS實現的,能夠認爲是synchronized的升級版(結尾處會講解到); 三、那麼本章節就和你們分享分析一下JDK1.8的Semaphore的工做原理;
一、Semaphore顧名思義,叫信號量; 二、Semaphore可用來控制同時訪問特定資源的線程數量,以此來達到協調線程工做; 三、Semaphore內部也有公平鎖、非公平鎖的靜態內部類,就像ReentrantLock同樣,Semaphore內部基本上是經過sync.xxx之類的這種調用方式的; 四、Semaphore內部維護了一個虛擬的資源池,若是許可爲0則線程阻塞等待,直到許可大於0時又能夠有機會獲取許可了;
一、其實Semaphore的實現也偏偏很好利用了其父類AQS的state變量值; 二、初始化一個數量值做爲許可池的資源,假設爲N,那麼當任何線程獲取到資源時,許可減1,直到許可爲0時後續來的線程就須要等待; 三、Semaphore,簡單大體意思爲:A、B、C、D線程同時爭搶資源,目前卡槽大小爲2,若A、B正在執行且未執行完,那麼C、D線程在門外等着,一旦A、B有1個執行完了,那麼C、D就會競爭看誰先執行; state初始值假設爲N,後續每tryAcquire()一次,state會CAS減1,當state爲0時其它線程處於等待狀態, 直到state>0且<N後,進程又能夠獲取到鎖進行各自操做了;
一、public Semaphore(int permits) // 建立一個給定許可數量的信號量對象,且默認以非公平鎖方式獲取資源 二、public Semaphore(int permits, boolean fair) // 建立一個給定許可數量的信號量對象,且是否公平方式由傳入的fair布爾參數值決定 三、public void acquire() // 今後信號量獲取一個許可,當許可數量小於零時,則阻塞等待 四、public void acquire(int permits) // 今後信號量獲取permits個許可,當許可數量小於零時,則阻塞等待,可是當阻塞等待的線程被喚醒後發現被中斷過的話則會拋InterruptedException異常 五、public void acquireUninterruptibly(int permits) // 今後信號量獲取permits個許可,當許可數量小於零時,則阻塞等待,可是當阻塞等待的線程被喚醒後發現被中斷過的話不會拋InterruptedException異常 六、public void release() // 釋放一個許可 七、public void acquire(int permits) // 釋放permits個許可
一、獲取共享鎖: public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } acquire{ 若是檢測中斷狀態發現被中斷過的話,那麼則拋出InterruptedException異常 若是嘗試獲取共享鎖失敗的話( 嘗試獲取共享鎖的各類方式由AQS的子類實現 ), 那麼就新增共享鎖結點經過自旋操做加入到隊列中,而且根據結點中的waitStatus來決定是否調用LockSupport.park進行休息 } 二、釋放共享鎖: public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } release{ 若是嘗試釋放共享鎖失敗的話( 嘗試釋放共享鎖的各類方式由AQS的子類實現 ), 那麼經過自旋操做喚完成阻塞線程的喚起操做 }
好比咱們每天在外面吃快餐,我就以吃快餐爲例生活化闡述該Semaphore原理: 一、場景:餐廳只有一個排隊的走廊,只有十個打飯窗口; 二、開飯時間點,剛開始的時候,人數很少,屈指可數,窗口不少,打飯菜天然很快,但隨着時間的推移人數會愈來愈多,會呈現阻塞擁擠情況,排起了慢慢長隊; 三、人數愈來愈多,但窗口只有十個,後來的就只好按先來後到排隊等待打飯菜咯,前面窗口空缺一個,排隊最前的一個則上去打飯菜,秩序有條不紊; 四、總之你們都挨個挨個排隊打飯,先來後到,相安無事的順序打飯菜; 五、到此打止,一、二、三、4能夠認爲是一種公平方式的信號量共享鎖; 六、可是呢,還有那麼些緊急趕時間的人,來餐廳時恰好看到師傅剛剛打完一我的的飯菜,因而插入去打飯菜敢時間; 七、若是敢時間人的來的時候發現師傅還在打飯菜,那麼就只得乖乖的排隊等候打飯菜咯; 八、到此打止,一、二、六、7能夠認爲是一種非公平方式的信號量共享鎖;
一、構造器源碼: /** * Creates a {@code Semaphore} with the given number of * permits and nonfair fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. */ public Semaphore(int permits) { sync = new NonfairSync(permits); } /** * Creates a {@code Semaphore} with the given number of * permits and the given fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. * @param fair {@code true} if this semaphore will guarantee * first-in first-out granting of permits under contention, * else {@code false} */ public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } 二、建立一個給定許可數量的信號量對象,默認使用非公平鎖,固然也可經過fair布爾參數值決定是公平鎖仍是非公平鎖;
一、AQS --> Sync ---> FairSync // 公平鎖 | |> NonfairSync // 非公平鎖 二、Semaphore內的同步器都是經過Sync抽象接口來操做調用關係的,細看會發現基本上都是經過sync.xxx之類的這種調用方式的;
一、源碼: /** * Acquires a permit from this semaphore, blocking until one is * available, or the thread is {@linkplain Thread#interrupt interrupted}. * * <p>Acquires a permit, if one is available and returns immediately, * reducing the number of available permits by one. * * <p>If no permit is available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * one of two things happens: * <ul> * <li>Some other thread invokes the {@link #release} method for this * semaphore and the current thread is next to be assigned a permit; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread. * </ul> * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting * for a permit, * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @throws InterruptedException if the current thread is interrupted */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); // 調用父類AQS中的獲取共享鎖資源的方法 } 二、acquire是信號量獲取共享資源的入口,嘗試獲取鎖資源,獲取到了則立馬返回並跳出該方法,沒有獲取到則該方法阻塞等待; 其主要也是調用sync的父類AQS的acquireSharedInterruptibly方法;
一、源碼: /** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 調用以前先檢測該線程中斷標誌位,檢測該線程在以前是否被中斷過 throw new InterruptedException(); // 若被中斷過的話,則拋出中斷異常 if (tryAcquireShared(arg) < 0) // 嘗試獲取共享資源鎖,小於0則獲取失敗,此方法由AQS的具體子類實現 doAcquireSharedInterruptibly(arg); // 將嘗試獲取鎖資源的線程進行入隊操做 } 二、acquireSharedInterruptibly是共享模式下線程獲取鎖資源的基類方法,每當線程獲取到一次共享資源,則共享資源數值就會作減法操做,直到共享資源值小於0時,則線程阻塞進入隊列等待; 三、並且該線程支持中斷,也正如方法名稱所意,當該方法檢測到中斷後則立馬會拋出中斷異常,讓調用該方法的地方立馬感知線程中斷狀況;
一、公平鎖tryAcquireShared源碼: // FairSync 公平鎖的 tryAcquireShared 方法 protected int tryAcquireShared(int acquires) { for (;;) { // 自旋的死循環操做方式 if (hasQueuedPredecessors()) // 檢查線程是否有阻塞隊列 return -1; // 若是有阻塞隊列,說明共享資源的許可數量已經用完,返回-1乖乖進行入隊操做 int available = getState(); // 獲取鎖資源的最新內存值 int remaining = available - acquires; // 計算獲得剩下的許可數量 if (remaining < 0 || // 若剩下的許可數量小於0,說明已經共享資源了,返回負數而後乖乖進入入隊操做 compareAndSetState(available, remaining)) // 若共享資源大於或等於0,防止併發則經過CAS操做佔據最後一個共享資源 return remaining; // 無論獲得remaining後進入了何種邏輯,操做了以後再將remaining返回,上層會根據remaining的值進行判斷是否須要入隊操做 } } 二、非公平鎖tryAcquireShared源碼: // NonfairSync 非公平鎖的 tryAcquireShared 方法 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); // } // NonfairSync 非公平鎖父類 Sync 類的 nonfairTryAcquireShared 方法 final int nonfairTryAcquireShared(int acquires) { for (;;) { // 自旋的死循環操做方式 int available = getState(); // 獲取鎖資源的最新內存值 int remaining = available - acquires; // 計算獲得剩下的許可數量 if (remaining < 0 || // 若剩下的許可數量小於0,說明已經共享資源了,返回負數而後乖乖進入入隊操做 compareAndSetState(available, remaining)) // 若共享資源大於或等於0,防止併發則經過CAS操做佔據最後一個共享資源 return remaining; // 無論獲得remaining後進入了何種邏輯,操做了以後再將remaining返回,上層會根據remaining的值進行判斷是否須要入隊操做 } } 三、tryAcquireShared法是AQS的子類實現的,也就是Semaphore的兩個靜態內部類實現的,目的就是經過CAS嘗試獲取共享鎖資源, 獲取共享鎖資源成功大於或等於0的天然數,獲取共享鎖資源失敗則返回負數;
一、源碼: /** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 按照給定的mode模式建立新的結點,模式有兩種:Node.EXCLUSIVE獨佔模式、Node.SHARED共享模式; final Node node = addWaiter(Node.SHARED); // 建立共享模式的結點 boolean failed = true; try { for (;;) { // 自旋的死循環操做方式 final Node p = node.predecessor(); // 獲取結點的前驅結點 if (p == head) { // 若前驅結點爲head的話,那麼說明當前結點天然不用說了,僅次於老大以後的即是老二了咯 int r = tryAcquireShared(arg); // 並且老二也但願嘗試去獲取一下鎖,萬一頭結點恰巧剛剛釋放呢?但願仍是要有的,萬一實現了呢。。。 if (r >= 0) { // 若r>=0,說明已經成功的獲取到了共享鎖資源 setHeadAndPropagate(node, r); // 把當前node結點設置爲頭結點,而且調用doReleaseShared釋放一下無用的結點 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && // 根據前驅結點看看是否須要休息一下子 parkAndCheckInterrupt()) // 阻塞操做,正常狀況下,獲取不到共享鎖,代碼就在該方法中止了,直到被喚醒 // 被喚醒後,發現parkAndCheckInterrupt()裏面檢測了被中斷了的話,則補上中斷異常,所以拋了個異常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 二、doAcquireSharedInterruptibly也是採用一個自旋的死循環操做方式,直到正常返回或者被喚醒拋出中斷異常爲止;
一、源碼: /** * Releases a permit, returning it to the semaphore. * * <p>Releases a permit, increasing the number of available permits by * one. If any threads are trying to acquire a permit, then one is * selected and given the permit that was just released. That thread * is (re)enabled for thread scheduling purposes. * * <p>There is no requirement that a thread that releases a permit must * have acquired that permit by calling {@link #acquire}. * Correct usage of a semaphore is established by programming convention * in the application. */ public void release() { sync.releaseShared(1); // 釋放一個許可資源 } 二、該方法是調用其父類AQS的一個釋放共享資源的基類方法;
一、源碼: /** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 嘗試釋放共享鎖資源,此方法由AQS的具體子類實現 doReleaseShared(); // 自旋操做,喚醒後繼結點 return true; } return false; } 二、releaseShared主要是進行共享鎖資源釋放,若是釋放成功則喚醒隊列等待的結點,若是失敗則返回false,由上層調用方決定如何處理;
一、源碼: // NonfairSync 和 FairSync 的父類 Sync 類的 tryReleaseShared 方法 protected final boolean tryReleaseShared(int releases) { for (;;) { // 自旋的死循環操做方式 int current = getState(); // 獲取最新的共享鎖資源值 int next = current + releases; // 對許可數量進行加法操做 // int類型值小於0,是由於該int類型的state狀態值溢出了,溢出了的話那得說明這個鎖有多難釋放啊,可能出問題了 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) // return true; // 返回成功標誌,告訴上層該線程已經釋放了共享鎖資源 } } 二、tryReleaseShared主要經過CAS操做對state鎖資源進行加法操做,騰出多餘的共享鎖資源供其它線程競爭;
一、源碼: /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { // 自旋的死循環操做方式 Node h = head; // 每次都是取出隊列的頭結點 if (h != null && h != tail) { // 若頭結點不爲空且也不是隊尾結點 int ws = h.waitStatus; // 那麼則獲取頭結點的waitStatus狀態值 if (ws == Node.SIGNAL) { // 若頭結點是SIGNAL狀態則意味着頭結點的後繼結點須要被喚醒了 // 經過CAS嘗試設置頭結點的狀態爲空狀態,失敗的話,則繼續循環,由於併發有可能其它地方也在進行釋放操做 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); // 喚醒頭結點的後繼結點 } // 如頭結點爲空狀態,則把其改成PROPAGATE狀態,失敗的則多是由於併發而被改動過,則再次循環處理 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若頭結點沒有發生什麼變化,則說明上述設置已經完成,大功告成,功成身退 // 若發生了變化,多是操做過程當中頭結點有了新增或者啥的,那麼則必須進行重試,以保證喚醒動做能夠延續傳遞 if (h == head) // loop if head changed break; } } 二、doReleaseShared主要是釋放共享許可,可是最重要的目的仍是保證喚醒後繼結點的傳遞,來讓這些線程釋放他們所持有的信號量;
一、在分析了AQS以後,再來分析Semaphore是否是變得比較簡單了; 二、在這裏我簡要總結一下Semaphore的流程的一些特性: • 管理一系列許可證,即state共享資源值; • 每acquire一次則state就減1一次,直到許可證數量小於0則阻塞等待; • 釋放許可的時候要保證喚醒後繼結點,以此來保證線程釋放他們所持有的信號量; • 是Synchronized的升級版,由於Synchronized是隻有一個許可,而Semaphore就像開了掛同樣,能夠有多個許可;
https://gitee.com/ylimhhmily/SpringCloudTutorial.gitnode
SpringCloudTutorial交流QQ羣: 235322432git
SpringCloudTutorial交流微信羣: 微信溝通羣二維碼圖片連接微信
歡迎關注,您的確定是對我最大的支持!!!併發