阻塞隊列與普通隊列的區別在於,當隊列是空的時,從隊列中獲取元素的操做將會被阻塞,或者當隊列是滿時,往隊列裏添加元素的操做會被阻塞java
延遲阻塞隊列DelayQueue
的底層是基於優先級隊列PriorityQueue
來實現的,所以研究延遲阻塞隊列,更多的注意力應集中在如下兩點緩存
類的聲明以下,要求隊列中的元素必須繼承 Delayed
安全
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
這個限定,主要服務於優先級隊列的排序要求,根據延遲時間對元素隊列中的元素進行排序併發
入隊的實現邏輯比較簡單,爲了保證併發安全,實現中實現加鎖機制異步
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
入隊的實際是交由優先級隊列進行實現,須要注意的是,入隊以後,額外的一個操做,若是入隊的元素剛好在隊列頭,執行兩個操做ide
leader
賦值爲空 (這個是幹嗎的,爲何這麼作?)available.signal()
喚醒被阻塞的線程(什麼線程被阻塞?)public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }
出隊的操做一樣加鎖,獲取隊列頭的元素,判斷延期時間是否結束,是才返回結果,不然返回nullpost
注意,這裏有兩個疑問性能
getDelay()
方法返回值會變麼,由誰來改變呢?雖然上面的出隊和入隊的邏輯比較簡單,可是留下的疑問一點都很多,上面的四個問題應該如何解答?學習
繼續看源碼,發現還有一個出隊的方法, 傳入了兩個參數表示阻塞的超時時間(即超過這個時間沒有返回,則拋一箇中端異常)測試
public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 時間轉換爲納秒 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 獲取隊列頭 E first = q.peek(); if (first == null) { // 隊列爲空 if (nanos <= 0) // 延時時間已過,直接返回null return null; else // 當前線程阻塞 nanos (ns),而後再次循環 nanos = available.awaitNanos(nanos); } else { // 隊列非空 // 獲取隊列頭元素的延遲時間 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 延遲時間小於0,直接返回隊列頭 return q.poll(); if (nanos <= 0) // 阻塞時間已過,隊列頭的延遲時間還沒到,則返回null return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) // 沒法獲取當前的隊列頭 //(由於隊列頭延遲時間大於阻塞時間,即隊列頭不生效) // 繼續阻塞,以指望此時可能新增一個到隊列頭 nanos = available.awaitNanos(nanos); else { // 能夠獲取當前隊列頭 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 阻塞到隊列頭生效 long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
分析: 以當前隊列爲空做爲條件
上面代碼的流程以下:
繼續化重點
getDelay
返回小於0)上面的方法由於加上了一個超時時間(即在指定的時間內依然沒法返回時,斷掉阻塞),分析起來可能不太順暢,再看源碼,還有一個take方法,邏輯與上面類似,只是砍掉了超時斷開阻塞的邏輯
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) // 隊列頭爲空,則阻塞,直到新增一個入隊爲止 available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 若隊列頭元素已生效,則直接返回 return q.poll(); first = null; // don't retain ref while waiting if (leader != null) // leader 非空時,表示有其餘的一個線程在出隊阻塞中 // 此時掛住當前線程,等待另外一個線程出隊完成 available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待隊列頭元素生效 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) // 當前線程出隊完成,通知其餘出隊阻塞的線程繼續執行 available.signal(); lock.unlock(); } }
經過了以前的燒腦邏輯以後,再看這個就簡單不少了
如有另外一個線程已經處於等待隊列頭生效的阻塞過程當中,則阻塞當前線程,直到另外一個線程完成出隊操做
若沒有其餘線程阻塞在出隊過程當中,即當前線程爲第一個獲取隊列頭的線程
leader = thisThread
)leader=null
)最後步驟1中被阻塞的線程
所以能夠愉快的解答上面的四個問題
添加一個元素到隊列頭
leader賦值爲空 (這個是幹嗎的,爲何這麼作?)
leader記錄了被阻塞在等待隊列頭生效的線程 新增一個元素到隊列頭,表示等待原來隊列頭生效的阻塞的線程已經失去了阻塞的意義,此時須要獲取新的隊列頭進行返回了
available.signal() 喚醒被阻塞的線程(什麼線程被阻塞?)
獲取隊列頭的線程被喚起,主要有兩種場景: 1. 以前隊列爲空,致使被阻塞的線程 2. 以前隊列非空,可是隊列頭沒有生效致使被阻塞的線程
普通的出隊方法
隊列中元素getDelay()方法返回值會變麼,由誰來改變呢?
必須得變,不然這個元素一直不生效,將直接致使線程一直阻塞 由隊列中的元素實現類來保證,返回值是逐漸變小的
上面的出隊沒有阻塞,直接返回了null
須要阻塞獲取隊列頭,用 `take`, `poll(long,TimeUnit)`來代替
上面分析的是阻塞隊列的實現原理,接下來舉一個實例來解析下這個延遲阻塞隊列的使用姿式,加深下理解
(簡化了在簡化以後的,與實際會有一些區別,請勿徹底認定合理)
好比和電商的詳情頁展現,爲了提升應用的性能,咱們將整個頁面進行了緩存,當詳情頁發生修改後,咱們會更新緩存的內容
所以爲了保證緩存的內容和實際的內容是一致的,咱們須要一個對帳的任務,當詳情頁修改後,而且更新緩存完成以後,咱們須要再次對比緩存和實際內容的一致性;
此時一個異步的任務能夠這麼設計:監聽詳情頁修改的事件,延遲一段時間,而後再對比緩存和實際內容的一致性 (這裏延遲一段時間主要是爲了保證緩存已經更新完成)
@Data @NoArgsConstructor @AllArgsConstructor public class DetailInfo { private int itemId; private String title; private String desc; private int price; }
UpdateTask
注意其中 getDelay()
的實現邏輯,根據當前時間與預訂的延遲生效時間進行比較
@Data @AllArgsConstructor public class UpdateTask implements Delayed { private int itemId; private long delayTime; @Override public long getDelay(TimeUnit unit) { return delayTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { return (int) (getDelay(TimeUnit.MICROSECONDS) - o.getDelay(TimeUnit.MICROSECONDS)); } }
更新事件的監聽訂閱使用了 Guava的EventBus
來處理,若有疑問能夠搜索EventBus的使用姿式
public class DetailManager { // 模擬真實數據存儲空間 private Map<Integer, DetailInfo> realMap = new ConcurrentHashMap<>(); // 模擬緩存空間 private Map<String, String> cache = new ConcurrentHashMap<>(); private Gson gson = new Gson(); private String getCacheKey(int itemId) { return "detailInfo_" + itemId; } // eventBus 用於發送更新事件;異步接受更新事件 private AsyncEventBus eventBus; private void init() { DetailInfo detailInfo = new DetailInfo(1, "onw", "第一個測試", 100); DetailInfo detailInfo2 = new DetailInfo(2, "two", "第二個測試", 200); realMap.put(detailInfo.getItemId(), detailInfo); realMap.put(detailInfo2.getItemId(), detailInfo2); cache.put(getCacheKey(detailInfo.getItemId()), gson.toJson(detailInfo)); cache.put(getCacheKey(detailInfo2.getItemId()), gson.toJson(detailInfo2)); eventBus = new AsyncEventBus("Validate-Thread", Executors.newFixedThreadPool(2)); eventBus.register(this); } // 模擬更新商品 public void updateDetail(int itemId) { DetailInfo detailInfo = realMap.get(itemId); long now = System.currentTimeMillis(); detailInfo.setTitle("title_" + itemId + "_" + now); cache.put(getCacheKey(itemId), gson.toJson(detailInfo)); // 發送一個修改的事件 eventBus.post(new UpdateTask(itemId, now + 5000)); System.out.println("[UpdateInfo]>>>ItemId: " + itemId + " updateTime: " + now + " validateTime: " + (now + 5000)); } // 延遲隊列 private DelayQueue<UpdateTask> delayQueue = new DelayQueue<>(); /** * 監聽修改事件 * @param updateTask */ @Subscribe public void verify(UpdateTask updateTask) { long getTaskTime = System.currentTimeMillis(); delayQueue.put(updateTask); try { UpdateTask task = delayQueue.take(); long processTime = System.currentTimeMillis(); DetailInfo real = realMap.get(task.getItemId()); String cacheObj = cache.get(getCacheKey(task.getItemId())); boolean ans = gson.toJson(real).equals(cacheObj); System.out.println("validate itemId: " + updateTask.getItemId() + " getEventTime: " + getTaskTime + " processTime:" + processTime + " ans: " + ans); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { DetailManager detailManager = new DetailManager(); detailManager.init(); // 開始修改 detailManager.updateDetail(1); Thread.sleep(20); detailManager.updateDetail(2); Thread.sleep(35000); } }
簡單說明主流程
DetailManager
輸出結果以下
[UpdateInfo]>>>ItemId: 1 updateTime: 1508677959067 validateTime: 1508677964067 [UpdateInfo]>>>ItemId: 2 updateTime: 1508677959103 validateTime: 1508677964103 Thread[pool-1-thread-1,5,main]>>> validate itemId: 1 getEventTime: 1508677959078 processTime:1508677964067 ans: true Thread[pool-1-thread-2,5,main]>>> validate itemId: 2 getEventTime: 1508677964067 processTime:1508677964103 ans: true
從上面的輸出能夠得知,實際驗證的時間戳和預期的時間錯是相同的
延遲阻塞隊列DelayQueue,學習下來以後感受很是有意思,首先是加深了使用姿式的瞭解,其次對其中的阻塞,喚醒機制有了必定了解,漲了鎖使用知識的見識(這裏面還有一個很是有意思的東西就是 Condition
和 ReentrantLock
的使用,後續線程安全篇的研究能夠以此做爲應用場景)
簡單小結上面的學習內容
PriorityQueue
take()
, poll(long, TimeUnit)
兩方法之一Delayed
接口,內部實現的getDelay
方法,要求返回值愈來愈小(若是一直大於0,這個延遲任務就一直沒法執行了)GG