雙向隊列(Deque),是Queue的一個子接口,雙向隊列是指該隊列兩端的元素既能入隊(offer)也能出隊(poll)。使用場景好比工做竊取,好比限流。ide
使用deque來限流,其中timeIntervalInMs爲事件窗口,maxLimit爲該事件窗口的最大值。測試
public class MyRateLimiter { private static final Logger LOGGER = LoggerFactory.getLogger(DemoRateLimiter.class); private final Deque<Long> queue; private long timeIntervalInMs; public MyRateLimiter(long timeIntervalInMs, int maxLimit) { this.timeIntervalInMs = timeIntervalInMs; this.queue = new LinkedBlockingDeque<Long>(maxLimit); } public boolean incrAndReachLimit(){ long currentTimeMillis = System.currentTimeMillis(); boolean success = queue.offerFirst(currentTimeMillis); if(success){ //沒有超過maxLimit return false; } synchronized (this){ //queue is full long last = queue.getLast(); //還在時間窗口內,超過maxLimit if (currentTimeMillis - last < timeIntervalInMs) { return true; } LOGGER.info("time window expired,current:{},last:{}",currentTimeMillis,last); //超過期間窗口了,超過maxLimit的狀況下,重置時間窗口 queue.removeLast(); queue.addFirst(currentTimeMillis); return false; } } }
測試this
@Test public void testDeque() throws InterruptedException { DemoRateLimiter limiter = new DemoRateLimiter(5*1000,3); Callable<Void> test = new Callable<Void>(){ @Override public Void call() throws Exception { for(int i=0;i<1000;i++){ LOGGER.info("result:{}",limiter.incrAndReachLimit()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } return null; } }; ExecutorService pool = Executors.newFixedThreadPool(10); pool.invokeAll(Arrays.asList(test,test,test,test,test)); Thread.sleep(100000); }
這裏使用了Deque的容量來做爲時間窗口的限流大小,利用兩端來判斷時間窗口,相對來說有點巧妙。code