在Java中,BlockingQueue
接口位於java.util.concurrent
包下。阻塞隊列主要用來線程安全的實現生產者-消費者模型。他們能夠使用於多個生產者和多個消費者的場景中。java
咱們能夠在各類論壇和文章中找到BlockingQueue
的範例。在這篇文章中,咱們將介紹如何持續管理隊列中的請求,以及如何在請求進入隊列後馬上處理。面試
咱們將使用單個線程管理任務放入隊列的操做以及從隊列中取出的操做。同時這個線程會持續的管理隊列。另外一個線程將用來建立BlockingQueue
,它將一直運行知道服務器終止。安全
阻塞隊列的大小能夠在對象初始化的時候設置。它的大小應該基於系統堆的大小。服務器
如今,讓咱們回顧建立阻塞隊列的步驟以及如何持續的管理和處理請求。微信
新建一個EventData
的POJO類,它會存儲生產者產生的事件數據並輸入到隊列中 - 同時它會被消費者從隊列中取出e並處理。ide
package com.dzone.blockingqueue.example; class EventData { private String eventID; private String eventName; private String eventDate; private String eventType; private String eventLocation; public String getEventID() { return eventID; } public void setEventID(String eventID) { this.eventID = eventID; } public String getEventName() { return eventName; } public void setEventName(String eventName) { this.eventName = eventName; } public String getEventDate() { return eventDate; } public void setEventDate(String eventDate) { this.eventDate = eventDate; } public String getEventType() { return eventType; } public void setEventType(String eventType) { this.eventType = eventType; } public String getEventLocation() { return eventLocation; } public void setEventLocation(String eventLocation) { this.eventLocation = eventLocation; } }
建立一個QueueService
單例類,用來將請求放入隊列中,以及從隊列中提取請求並處理。this
package com.dzone.blockingqueue.example; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class QueueService { private static QueueService instance = null; private static BlockingQueue < EventData > eventQueue = null; private QueueService() {} public static QueueService getInstance() { if (instance == null) { instance = new QueueService(); } return instance; } private void initialize() { if (eventQueue == null) { eventQueue = new LinkedBlockingQueue <EventData> (); EventProcessor eventProcessor = new EventProcessor(); eventProcessor.start(); } } public void putEventInQueue(EventData eventData) { try { initialize(); eventQueue.put(eventData); } catch (InterruptedException ex) { ex.printStackTrace(); } } class EventProcessor extends Thread { @Override public void run() { for (;;) { EventData eventData = null; try { eventData = eventQueue.take(); System.out.println("Process Event Data : Type : " + eventData.getEventType() + " / Name : " + eventData.getEventName()); } catch (InterruptedException ex) { ex.printStackTrace(); } } } } }
咱們新建了一個靜態的BlockingQueue
變量。它在初始化時會比初始化爲ArrayBlockingQueue
或是LinkedBlockingQueue
,這取決於需求。在此以後,這個對象會被用來放入或是提取請求。spa
咱們還新建了一個繼承了Thread
的EventProcessor
私有類。它在BlockingQueue初始化的時候啓動。在EventProcessor
中使用了一個for循環來管理隊列。BlockingQueue
的優勢在於它會在沒有元素的時候進入等待模式。當隊列爲空時,for循環不會繼續遍歷。當請求進入隊列後,BlockingQueue
會繼續運行並處理請求。線程
單個EventProcessor線程將處理特定隊列中的全部請求。此線程永遠不會過時,有助於實現持續監控。code
咱們還在QueueService
中建立了一個公有的putEventInQueue
方法,它會幫助咱們將請求放入由getInstance
方法獲取的隊列中。在這個方法裏,請求被放入BlockingQueue
。這些請求將會自動的被BlockingQueue
獲取,並在EventProcessor
線程中繼續處理。
如今讓咱們向隊列中加載數據。咱們已經實現了一個EventService
類。它會將幾個請求寫入BlockingQueue
中。在QueueService
中,咱們會看到請求是如何被取出並處理的。
package com.dzone.blockingqueue.example; public class EventService { public static void main(String arg[]) { try { EventData event = null; for (int i = 0; i < 100; i++) { event = new EventData(); event.setEventType("EventType " + i); event.setEventName("EventName " + i); QueueService.getInstance().putEventInQueue(event); Thread.sleep(100); } } catch (InterruptedException e) { e.printStackTrace(); } } }
輸出結果以下:
Process Event Data : Type : EventType 0 / Name : EventName 0 Process Event Data : Type : EventType 1 / Name : EventName 1 Process Event Data : Type : EventType 2 / Name : EventName 2 Process Event Data : Type : EventType 3 / Name : EventName 3 Process Event Data : Type : EventType 4 / Name : EventName 4
想要了解更多開發技術,面試教程以及互聯網公司內推,歡迎關注個人微信公衆號!將會不按期的發放福利哦~