瘋狂創客圈 經典圖書 : 《Netty Zookeeper Redis 高併發實戰》 面試必備 + 面試必備 + 面試必備 【博客園總入口 】html
瘋狂創客圈 經典圖書 : 《SpringCloud、Nginx高併發核心編程》 大廠必備 + 大廠必備 + 大廠必備 【博客園總入口 】java
入大廠+漲工資必備: 高併發【 億級流量IM實戰】 實戰系列 【 SpringCloud Nginx秒殺】 實戰系列 【博客園總入口 】node
Semaphore是計數信號量。Semaphore管理一系列許可。每一個acquire方法阻塞,直到有一個許可證能夠得到而後拿走一個許可證;每一個release方法增長一個許可,這可能會釋放一個阻塞的acquire方法。然而,其實並無實際的許可這個對象,Semaphore只是維持了一個可得到許可證的數量。面試
好比:停車場入口立着的那個顯示屏,每有一輛車進入停車場顯示屏就會顯示剩餘車位減1,每有一輛車從停車場出去,顯示屏上顯示的剩餘車輛就會加1,當顯示屏上的剩餘車位爲0時,停車場入口的欄杆就不會再打開,車輛就沒法進入停車場了,直到有一輛車從停車場出去爲止。數據庫
好比:在學生時代都去餐廳打過飯,假若有3個窗口能夠打飯,同一時刻也只能有3名同窗打飯。第四我的來了以後就必須在外面等着,只要有打飯的同窗好了,就能夠去相應的窗口了 。編程
//建立具備給定的許可數和非公平的公平設置的 Semaphore。 Semaphore(int permits) //建立具備給定的許可數和給定的公平設置的 Semaphore。 Semaphore(int permits, boolean fair)
在上面咱們使用最基本的acquire方法和release方法就能夠實現Semaphore最多見的功能,不過其餘方法仍是須要咱們去了解一下的。併發
一、acquire(int permits) 今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被中斷。就比如是一個學生佔兩個窗口。這同時也對應了相應的release方法。 二、release(int permits) 釋放給定數目的許可,將其返回到信號量。這個是對應於上面的方法,一個學生佔幾個窗口完事以後還要釋放多少 三、availablePermits() 返回此信號量中當前可用的許可數。也就是返回當前還有多少個窗口可用。 四、reducePermits(int reduction) 根據指定的縮減量減少可用許可的數目。 五、hasQueuedThreads() 查詢是否有線程正在等待獲取資源。 六、getQueueLength() 返回正在等待獲取的線程的估計數目。該值僅是估計的數字。 七、tryAcquire(int permits, long timeout, TimeUnit unit) 若是在給定的等待時間內此信號量有可用的全部許可,而且當前線程未被中斷,則今後信號量獲取給定數目的許可。 八、acquireUninterruptibly(int permits) 今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。
這個案例使用的就是咱們以前的小例子,也就是去餐廳打飯的案例。高併發
咱們先看Test類:工具
在這個代碼中咱們看到,主要是new了一個Semaphore,而後賦給每一位同窗Student,接下來咱們就來好好看看Student線程是如何實現的。oop
在這個Student類中咱們最主要看run方法的實現,首先咱們經過acquire獲取了當前窗口的許可,而後休眠3秒錶明打飯,最後在finally使用release方法釋放這個窗口許可證。代碼很簡單,原理很清楚,咱們測試一波:
這個結果你也看到了,基本上同一時刻只能有三個學生在窗口旁邊。
在這裏你可能有一個疑問了,Semaphore好像和synchronized關鍵字沒什麼區別,均可以實現同步,若是是這樣那說明咱們尚未真正理解jdk的註釋,他只是限制了訪問某些資源的線程數,其實並無實現同步,咱們能夠看一下:
如今咱們在獲取許可前增長了一條輸出語句,也就是能打印出有哪一個線程進入了,再去測試一波
結果很清晰,因此對於Semaphore來講,咱們須要記住的實際上是資源的互斥而不是資源的同步,在同一時刻是沒法保證同步的,可是卻能夠保證資源的互斥。
用於那些資源有明確訪問數量限制的場景,經常使用於限流 。
好比:數據庫鏈接池,同時進行鏈接的線程有數量限制,鏈接不能超過必定的數量,當鏈接達到了限制數量後,後面的線程只能排隊等前面的線程釋放了數據庫鏈接才能得到數據庫鏈接。
好比:停車場場景,車位數量有限,同時只能容納多少臺車,車位滿了以後只有等裏面的車離開停車場外面的車才能夠進入。
Semaphore semaphore=new Semaphore(2);
一、當調用new Semaphore(2) 方法時,默認會建立一個非公平的鎖的同步阻塞隊列。
二、把初始許可數量賦值給同步隊列的state狀態,state的值就表明當前所剩餘的許可數量。
初始化完成後同步隊列信息以下圖:
semaphore.acquire();
一、當前線程會嘗試去同步隊列獲取一個許可,獲取許可的過程也就是使用原子的操做去修改同步隊列的state ,獲取一個許可則修改成state=state-1。
二、 當計算出來的state<0,則表明許可數量不足,此時會建立一個Node節點加入阻塞隊列,掛起當前線程。
三、當計算出來的state>=0,則表明獲取許可成功。
源碼:
/** * 獲取1個許可 */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
/** * 共享模式下獲取許可,獲取成功則返回,失敗則加入阻塞隊列,掛起線程 * @param arg * @throws InterruptedException */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //嘗試獲取許可,arg爲獲取許可個數,當可用許可數減當前許可數結果小於0,則建立一個節點加入阻塞隊列,掛起當前線程。 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
/** * 一、建立節點,加入阻塞隊列, * 二、重雙向鏈表的head,tail節點關係,清空無效節點 * 三、掛起當前節點線程 * @param arg * @throws InterruptedException */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //建立節點加入阻塞隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //得到當前節點pre節點 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);//返回鎖的state if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //重組雙向鏈表,清空無效節點,掛起當前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
線程一、線程二、線程三、分別調用semaphore.acquire(),整個過程隊列信息變化以下圖:
semaphore.release();
當調用semaphore.release() 方法時
一、線程會嘗試釋放一個許可,釋放許可的過程也就是把同步隊列的state修改成state=state+1的過程
二、釋放許可成功以後,同時會喚醒同步隊列的全部阻塞節共享節點線程
三、被喚醒的節點會從新嘗試去修改state=state-1 的操做,若是state>=0則獲取許可成功,不然從新進入阻塞隊列,掛起線程。
源碼:
/** * 釋放許可 */ public void release() { sync.releaseShared(1); }
/** *釋放共享鎖,同時喚醒全部阻塞隊列共享節點線程 * @param arg * @return */ public final boolean releaseShared(int arg) { //釋放共享鎖 if (tryReleaseShared(arg)) { //喚醒全部共享節點線程 doReleaseShared(); return true; } return false; }
/** * 喚醒全部共享節點線程 */ private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {//是否須要喚醒後繼節點 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改狀態爲初始0 continue; unparkSuccessor(h);//喚醒h.nex節點線程 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)); } if (h == head) // loop if head changed break; } }
繼上面的圖,當咱們線程1調用semaphore.release(); 時候整個流程以下圖:
瘋狂創客圈 - Java高併發研習社羣,爲你們開啓大廠之門