這是java高併發系列第29篇。java
環境:jdk1.8。node
以秒殺業務爲例,10個iphone,100萬人搶購,100萬人同時發起請求,最終可以搶到的人也就是前面幾我的,後面的基本上都沒有但願了,那麼咱們能夠經過控制併發數來實現,好比並發數控制在10個,其餘超過併發數的請求所有拒絕,提示:秒殺失敗,請稍後重試。面試
併發控制的,通俗解釋:一大波人去商場購物,必須通過一個門口,門口有個門衛,兜裏面有指定數量的門禁卡,來的人先去門衛那邊拿取門禁卡,拿到卡的人才能夠刷卡進入商場,拿不到的能夠繼續等待。進去的人出來以後會把卡歸還給門衛,門衛能夠把歸還來的卡繼續發放給其餘排隊的顧客使用。算法
JUC中提供了這樣的工具類:Semaphore,示例代碼:安全
package com.itsoku.chat29; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * 跟着阿里p7學併發,微信公衆號:javacode2018 */ public class Demo1 { static Semaphore semaphore = new Semaphore(5); public static void main(String[] args) { for (int i = 0; i < 20; i++) { new Thread(() -> { boolean flag = false; try { flag = semaphore.tryAcquire(100, TimeUnit.MICROSECONDS); if (flag) { //休眠2秒,模擬下單操做 System.out.println(Thread.currentThread() + ",嘗試下單中。。。。。"); TimeUnit.SECONDS.sleep(2); } else { System.out.println(Thread.currentThread() + ",秒殺失敗,請稍微重試!"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if (flag) { semaphore.release(); } } }).start(); } } }
輸出:微信
Thread[Thread-10,5,main],嘗試下單中。。。。。 Thread[Thread-8,5,main],嘗試下單中。。。。。 Thread[Thread-9,5,main],嘗試下單中。。。。。 Thread[Thread-12,5,main],嘗試下單中。。。。。 Thread[Thread-11,5,main],嘗試下單中。。。。。 Thread[Thread-2,5,main],秒殺失敗,請稍微重試! Thread[Thread-1,5,main],秒殺失敗,請稍微重試! Thread[Thread-18,5,main],秒殺失敗,請稍微重試! Thread[Thread-16,5,main],秒殺失敗,請稍微重試! Thread[Thread-0,5,main],秒殺失敗,請稍微重試! Thread[Thread-3,5,main],秒殺失敗,請稍微重試! Thread[Thread-14,5,main],秒殺失敗,請稍微重試! Thread[Thread-6,5,main],秒殺失敗,請稍微重試! Thread[Thread-13,5,main],秒殺失敗,請稍微重試! Thread[Thread-17,5,main],秒殺失敗,請稍微重試! Thread[Thread-7,5,main],秒殺失敗,請稍微重試! Thread[Thread-19,5,main],秒殺失敗,請稍微重試! Thread[Thread-15,5,main],秒殺失敗,請稍微重試! Thread[Thread-4,5,main],秒殺失敗,請稍微重試! Thread[Thread-5,5,main],秒殺失敗,請稍微重試!
關於Semaphore
的使用,能夠移步:JUC中的Semaphore(信號量)多線程
國慶期間比較火爆的景點,人流量巨大,通常入口處會有限流的彎道,讓遊客進去進行排隊,排在前面的人,每隔一段時間會放一撥進入景區。排隊人數超過了指定的限制,後面再來的人會被告知今天已經遊客量已經達到峯值,會被拒絕排隊,讓其明天或者之後再來,這種玩法採用漏桶限流的方式。併發
漏桶算法思路很簡單,水(請求)先進入到漏桶裏,漏桶以必定的速度出水,當水流入速度過大會直接溢出,能夠看出漏桶算法能強行限制數據的傳輸速率。框架
漏桶算法示意圖:iphone
簡陋版的實現,代碼以下:
package com.itsoku.chat29; import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; /** * 跟着阿里p7學併發,微信公衆號:javacode2018 */ public class Demo2 { public static class BucketLimit { static AtomicInteger threadNum = new AtomicInteger(1); //容量 private int capcity; //流速 private int flowRate; //流速時間單位 private TimeUnit flowRateUnit; private BlockingQueue<Node> queue; //漏桶流出的任務時間間隔(納秒) private long flowRateNanosTime; public BucketLimit(int capcity, int flowRate, TimeUnit flowRateUnit) { this.capcity = capcity; this.flowRate = flowRate; this.flowRateUnit = flowRateUnit; this.bucketThreadWork(); } //漏桶線程 public void bucketThreadWork() { this.queue = new ArrayBlockingQueue<Node>(capcity); //漏桶流出的任務時間間隔(納秒) this.flowRateNanosTime = flowRateUnit.toNanos(1) / flowRate; Thread thread = new Thread(this::bucketWork); thread.setName("漏桶線程-" + threadNum.getAndIncrement()); thread.start(); } //漏桶線程開始工做 public void bucketWork() { while (true) { Node node = this.queue.poll(); if (Objects.nonNull(node)) { //喚醒任務線程 LockSupport.unpark(node.thread); } //休眠flowRateNanosTime LockSupport.parkNanos(this.flowRateNanosTime); } } //返回一個漏桶 public static BucketLimit build(int capcity, int flowRate, TimeUnit flowRateUnit) { if (capcity < 0 || flowRate < 0) { throw new IllegalArgumentException("capcity、flowRate必須大於0!"); } return new BucketLimit(capcity, flowRate, flowRateUnit); } //當前線程加入漏桶,返回false,表示漏桶已滿;true:表示被漏桶限流成功,能夠繼續處理任務 public boolean acquire() { Thread thread = Thread.currentThread(); Node node = new Node(thread); if (this.queue.offer(node)) { LockSupport.park(); return true; } return false; } //漏桶中存放的元素 class Node { private Thread thread; public Node(Thread thread) { this.thread = thread; } } } public static void main(String[] args) { BucketLimit bucketLimit = BucketLimit.build(10, 60, TimeUnit.MINUTES); for (int i = 0; i < 15; i++) { new Thread(() -> { boolean acquire = bucketLimit.acquire(); System.out.println(System.currentTimeMillis() + " " + acquire); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } }
代碼中BucketLimit.build(10, 60, TimeUnit.MINUTES);
建立了一個容量爲10,流水爲60/分鐘的漏桶。
代碼中用到的技術有:
令牌桶算法的原理是系統以恆定的速率產生令牌,而後把令牌放到令牌桶中,令牌桶有一個容量,當令牌桶滿了的時候,再向其中放令牌,那麼多餘的令牌會被丟棄;當想要處理一個請求的時候,須要從令牌桶中取出一個令牌,若是此時令牌桶中沒有令牌,那麼則拒絕該請求。從原理上看,令牌桶算法和漏桶算法是相反的,一個「進水」,一個是「漏水」。這種算法能夠應對突發程度的請求,所以比漏桶算法好。
令牌桶算法示意圖:
有興趣的能夠本身去實現一個。
Google開源工具包Guava提供了限流工具類RateLimiter,能夠很是方便的控制系統每秒吞吐量,示例代碼以下:
package com.itsoku.chat29; import com.google.common.util.concurrent.RateLimiter; import java.util.Calendar; import java.util.Date; import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; /** * 跟着阿里p7學併發,微信公衆號:javacode2018 */ public class Demo3 { public static void main(String[] args) throws InterruptedException { RateLimiter rateLimiter = RateLimiter.create(5);//設置QPS爲5 for (int i = 0; i < 10; i++) { rateLimiter.acquire(); System.out.println(System.currentTimeMillis()); } System.out.println("----------"); //能夠隨時調整速率,咱們將qps調整爲10 rateLimiter.setRate(10); for (int i = 0; i < 10; i++) { rateLimiter.acquire(); System.out.println(System.currentTimeMillis()); } } }
輸出:
1566284028725 1566284028922 1566284029121 1566284029322 1566284029522 1566284029721 1566284029921 1566284030122 1566284030322 1566284030522 ---------- 1566284030722 1566284030822 1566284030921 1566284031022 1566284031121 1566284031221 1566284031321 1566284031422 1566284031522 1566284031622
代碼中RateLimiter.create(5)
建立QPS爲5的限流對象,後面又調用rateLimiter.setRate(10);
將速率設爲10,輸出中分2段,第一段每次輸出相隔200毫秒,第二段每次輸出相隔100毫秒,能夠很是精準的控制系統的QPS。
上面介紹的這些,業務中可能會用到,也能夠用來應對面試。
java高併發系列連載中,總計估計會有四五十篇文章。
阿里p7一塊兒學併發,公衆號:路人甲java,天天獲取最新文章!