再多的話就不說了,這個是接着上一講:
【一塊兒學設計模式】狀態模式+裝飾器模式+簡單工廠模式實戰:(一)提交個訂單我到底經歷了什麼鬼?
一塊兒的,一些多餘的贅述請先看這個篇文章。html
一圖流,仍是上一篇文章中同樣的圖,接下來咱們就梳理下總結模式、觀察者模式、備忘錄模式的應用:java
訂單中心:
一、訂單中心建立訂單
二、訂單狀態流轉(狀態模式)
三、記錄操做日誌(裝飾器模式+簡單工廠模式)
四、訂單中心通知 庫存中心更新庫存mysql
調度中心:
一、庫存中心更新本地庫存(使用命令模式+模板方法模式+工廠模式)
這個上講已經說過:[【一塊兒學設計模式】命令模式+模板方法+工廠方法實戰: 如何優雅的更新商品庫存...][5]
二、將更新庫存數據放到消息中,調度中心消費消息(中介模式)
三、放入消息隊列中,判斷隊列是否放滿了,若是放滿了須要創建離線存儲(備忘錄模式)
四、異步監聽消息處理結果(觀察者模式)redis
這個模型應該很簡單,咱們來一步步拆解 一步步代碼分析sql
下單後,訂單中心調用庫存中心 扣減庫存,而後庫存中心調用調度中心,調度中心再去自身庫存扣減、WMS揀貨單生成、發貨單生成等一些列調度任務。數據庫
爲了解耦,庫存中心將須要發送的內容放到一個內存隊列中,調度中心異步去消費消息。設計模式
庫存中心提供給訂單中心接口app
/** * 通知庫存中心,「提交訂單」事件發生了 * @param orderDTO 訂單DTO * @return 處理結果 */ @Override public Boolean informSubmitOrderEvent(OrderInfoDTO orderDTO) { try { // 更新本地庫存 // do logic // 發送異步消息到內存隊列 StockUpdateMessage message = new StockUpdateMessage(); message.setId(UUID.randomUUID().toString().replace("-", "")); message.setOperation(GoodsStockUpdateOperation.SUBMIT_ORDER); message.setParameter(orderDTO); goodsStockUpdateQueue.put(message); // 監聽異步處理結果 goodsStockUpdateManager.observe(message.getId()); } catch (Exception e) { logger.error("error", e); return false; } return true; }
自定義一個內存隊列dom
/** * 商品庫存更新消息的隊列接口 * @author wangmeng * */ public interface StockUpdateQueue { /** * 將一個消息放入隊列 * @param message 消息 * @throws Exception */ void put(StockUpdateMessage message) throws Exception; /** * 直接將消息放入隊列 * @param message * @throws Exception */ void putDirect(StockUpdateMessage message) throws Exception; /** * 從隊列中取出一個消息 * @return * @throws Exception */ StockUpdateMessage take() throws Exception; /** * 獲取隊列大小 * @return * @throws Exception */ Integer size() throws Exception; } /** * 商品庫存更新隊列實現類 * @author wangmeng * */ @Component public class StockUpdateQueueImpl implements StockUpdateQueue { private static final Integer QUEUE_MAX_SIZE = 1000; /** * 離線存儲管理組件 */ @Autowired private OfflineStorageManager offlineStorageManager; /** * 商品庫存更新隊列 */ private ArrayBlockingQueue<StockUpdateMessage> queue = new ArrayBlockingQueue<StockUpdateMessage>(QUEUE_MAX_SIZE); /** * 將一個消息放入隊列 * @param message 消息 * @throws Exception */ @Override public void put(StockUpdateMessage message) throws Exception { queue.put(message); } /** * 從隊列中取出一個消息 * @return * @throws Exception */ @Override public StockUpdateMessage take() throws Exception { return queue.take(); } /** * 直接將消息放入隊列 * @param message * @throws Exception */ @Override public void putDirect(StockUpdateMessage message) throws Exception { queue.put(message); } /** * 獲取隊列大小 * @return * @throws Exception */ @Override public Integer size() throws Exception { return queue.size(); } }
自定義消息體異步
/** * 商品庫存更新消息 * @author wangmeng * */ @Data public class StockUpdateMessage { /** * id */ private String id; /** * 商品庫存更新操做 */ private Integer operation; /** * 核心參數數據 */ private Object parameter; }
調度中心消息消費者
/** * 庫存更新消息消費者 * @author wangmeng * */ @Component public class ScheduleStockUpdateMessageConsumer extends Thread { private static final Logger logger = LoggerFactory.getLogger( ScheduleStockUpdateMessageConsumer.class); /** * 庫存更新消息隊列 */ @Autowired private StockUpdateQueue stockUpdateQueue; /** * 調度中心接口 */ @Autowired private ScheduleService scheduleService; /** * 庫存中心的消息管理器 */ @Autowired private StockUpdateResultManager stockUpdateResultManager; /** * 消費庫存更新消息 */ @Override public void run() { while(true) { try { StockUpdateMessage message = stockUpdateQueue.take(); if(!isOrderRelatedMessage(message)) { continue; } OrderInfoDTO order = getOrderFromMessage(message); processMessage(message, order); stockUpdateResultManager.inform(message.getId(), true); } catch (Exception e) { logger.error("error", e); } } } /** * 是不是訂單相關的操做 * @param message 消息 * @return 是不是訂單相關的操做 * @throws Exception */ private Boolean isOrderRelatedMessage(StockUpdateMessage message) throws Exception { return GoodsStockUpdateOperation.SUBMIT_ORDER.equals(message.getOperation()) || GoodsStockUpdateOperation.CANCEL_ORDER.equals(message.getOperation()) || GoodsStockUpdateOperation.PAY_ORDER.equals(message.getOperation()); } /** * 從消息中獲取訂單 * @param message 消息 * @return 訂單 * @throws Exception */ private OrderInfoDTO getOrderFromMessage(StockUpdateMessage message) throws Exception { return (OrderInfoDTO) message.getParameter(); } /** * 處理消息 * @param order 訂單 * @return 處理結果 * @throws Exception */ private Boolean processMessage(StockUpdateMessage message, OrderInfoDTO order) throws Exception { if(GoodsStockUpdateOperation.SUBMIT_ORDER.equals(message.getOperation())) { return scheduleService.informSubmitOrderEvent(order); } else if(GoodsStockUpdateOperation.CANCEL_ORDER.equals(message.getOperation())) { return scheduleService.informCancelOrderEvent(order); } else if(GoodsStockUpdateOperation.PAY_ORDER.equals(message.getOperation())) { return scheduleService.informPayOrderEvent(order); } return false; } }
這裏咱們用的是一個內存阻塞隊列,那麼咱們就須要考慮若是消費者出現異常或者消費過慢的狀況致使消息阻塞該怎麼辦?
這裏咱們使用備忘錄模式 記錄隊列中隊列是否滿載,若是是則加入到離線存儲,保存到db中。若是隊列恢復size=0 再將離線數據放入隊列中。
消息放入隊列
/** * 將一個消息放入隊列 * @param message 消息 * @throws Exception */ public void put(StockUpdateMessage message) throws Exception { // 每次要往內存隊列放消息以前,先檢查一下離線存儲標識 // 若是觸發了離線存儲,直接就往離線存儲去寫入,不要走後面的邏輯了 // 寫完離線存儲以後,須要檢查一下內存隊列的大小,若是內存隊列已經清零,則啓動一個後臺線程 // 讓後臺線程去將離線存儲中的數據恢復寫入內存隊列中 if(offlineStorageManager.getOffline()) { offlineStorageManager.store(message); if(queue.size() == 0) { new OfflineResumeThread(offlineStorageManager, this).start(); } return; } // 若是內存隊列已經滿了,此時就觸發離線存儲 if(QUEUE_MAX_SIZE.equals(queue.size())) { offlineStorageManager.store(message); offlineStorageManager.setOffline(true); return; } queue.put(message); }
離線存儲管理器
/** * 離線存儲管理組件接口 * @author wangmeng * */ public interface OfflineStorageManager { /** * 離線存儲庫存更新消息 * @param message 庫存更新消息 * @throws Exception */ void store(StockUpdateMessage message) throws Exception; /** * 獲取離線存儲標識 * @return 離線存儲標識 * @throws Exception */ Boolean getOffline() throws Exception; /** * 設置離線存儲標識 * @param offline 離線存儲標識 * @throws Exception */ void setOffline(Boolean offline) throws Exception; /** * 所謂的迭代器模式,何時用? * * 其實只有一個場景,就是若是你須要基於一些不支持迭代的數據,來讓咱們業務代碼進行迭代 * 那麼你本身就要去實現基於那個數據的一套迭代代碼 * 以迭代器的方式返回回去給業務方,來經過你定義的迭代器,進行數據的迭代 * * mysql數據庫,自己是不支持迭代式訪問的,可是咱們能夠本身實現一套基於mysql的迭代訪問的代碼 * 把一個迭代器給返回回去 * * 好比有的時候,咱們可能還須要基於es、redis的數據,來提供業務方迭代式訪問的功能,那麼此時就只能咱們本身 * 去封裝迭代器,在裏面封裝基於es、redis的迭代訪問數據的邏輯 * */ /** * 獲取迭代器 * @return 迭代器 * @throws Exception */ OfflineStorageIterator iterator() throws Exception; /** * 批量刪除庫存更新消息 * @param stockUpdateMessages 庫存更新消息 * @throws Exception */ void removeByBatch(List<StockUpdateMessage> stockUpdateMessages) throws Exception; } /** * 離線存儲管理組件 * @author wangmeng * */ @Component public class OfflineStorageManagerImpl implements OfflineStorageManager { /** * 庫存更新消息管理模塊DAO組件 */ @Autowired private StockUpdateMessageDAO stockUpdateMessageDAO; /** * 是否觸發離線存儲的標識 */ private Boolean offline = false; /** * 離線存儲庫存更新消息 * @param message 庫存更新消息 * @throws Exception */ @Override public void store(StockUpdateMessage message) throws Exception { StockUpdateMessageDO stockUpdateMessageDO = createStockUpdateMessageDO(message); stockUpdateMessageDAO.save(stockUpdateMessageDO); } /** * 建立庫存更新消息DO對象 * @param message 庫存更新消息 * @return 庫存更新消息DO對象 * @throws Exception */ private StockUpdateMessageDO createStockUpdateMessageDO( StockUpdateMessage message) throws Exception { StockUpdateMessageDO stockUpdateMessageDO = new StockUpdateMessageDO(); stockUpdateMessageDO.setMessageId(message.getId()); stockUpdateMessageDO.setOperation(message.getOperation()); stockUpdateMessageDO.setParameter(JSONObject.toJSONString(message.getParameter())); stockUpdateMessageDO.setParamterClazz(message.getParameter().getClass().getName()); stockUpdateMessageDO.setGmtCreate(new Date()); stockUpdateMessageDO.setGmtModified(new Date()); return stockUpdateMessageDO; } /** * 獲取離線存儲標識 * @return 離線存儲標識 * @throws Exception */ @Override public Boolean getOffline() throws Exception { return offline; } /** * 設置離線存儲標識 * @param offline 離線存儲標識 * @throws Exception */ @Override public void setOffline(Boolean offline) throws Exception { this.offline = offline; } /** * 批量刪除庫存更新消息 * @param stockUpdateMessages 庫存更新消息 * @throws Exception */ @Override public void removeByBatch(List<StockUpdateMessage> stockUpdateMessages) throws Exception { StringBuilder builder = new StringBuilder(""); for(int i = 0; i < stockUpdateMessages.size(); i++) { builder.append(stockUpdateMessages.get(i).getId()); if(i < stockUpdateMessages.size() - 1) { builder.append(","); } } stockUpdateMessageDAO.removeByBatch(builder.toString()); } /** * 獲取離線數據迭代器 * @throws Exception */ @Override public OfflineStorageIterator iterator() throws Exception { return new OfflineStorageIteratorImpl(); } /** * 離線數據迭代器 * @author zhonghuashishan * */ public class OfflineStorageIteratorImpl implements OfflineStorageIterator { /** * 判斷是否還有下一批庫存更新消息 * @return 是否還有下一批庫存更新消息 * @throws Exception */ @Override public Boolean hasNext() throws Exception { return stockUpdateMessageDAO.count().equals(0L) ? false : true; } /** * 獲取下一批庫存更新消息 * @return 下一批庫存更新消息 * @throws Exception */ @Override public List<StockUpdateMessage> next() throws Exception { List<StockUpdateMessage> stockUpdateMessages = new ArrayList<StockUpdateMessage>(); List<StockUpdateMessageDO> stockUpdateMessageDOs = stockUpdateMessageDAO.listByBatch(); for(StockUpdateMessageDO stockUpdateMessageDO : stockUpdateMessageDOs) { StockUpdateMessage stockUpdateMessage = new StockUpdateMessage(); stockUpdateMessage.setId(stockUpdateMessageDO.getMessageId()); stockUpdateMessage.setOperation(stockUpdateMessageDO.getOperation()); stockUpdateMessage.setParameter(JSONObject.parseObject(stockUpdateMessageDO.getParameter(), Class.forName(stockUpdateMessageDO.getParamterClazz()))); stockUpdateMessages.add(stockUpdateMessage); } return stockUpdateMessages; } } }
離線數據恢復類
/** * 離線數據恢復線程 * @author wangmeng * */ public class OfflineResumeThread extends Thread { private static final Logger logger = LoggerFactory.getLogger(OfflineResumeThread.class); /** * 離線存儲管理組件 */ private OfflineStorageManager offlineStorageManager; /** * 庫存更新隊列 */ private StockUpdateQueue stockUpdateQueue; /** * 構造函數 * @param offlineStorageManager 離線存儲管理組件 */ public OfflineResumeThread(OfflineStorageManager offlineStorageManager, StockUpdateQueue stockUpdateQueue) { this.offlineStorageManager = offlineStorageManager; this.stockUpdateQueue = stockUpdateQueue; } /** * 執行線程 */ @Override public void run() { try { // 若是表中還有數據的話 OfflineStorageIterator offlineStorageIterator = offlineStorageManager.iterator(); while(offlineStorageIterator.hasNext()) { try { // 每次就從mysql中查詢50條數據,批量查詢,批量處理,批量刪除 List<StockUpdateMessage> stockUpdateMessages = offlineStorageIterator.next(); // 將這批數據寫入內存隊列中 for(StockUpdateMessage message : stockUpdateMessages) { stockUpdateQueue.putDirect(message); } // 批量刪除這批數據 offlineStorageManager.removeByBatch(stockUpdateMessages); } catch (Exception e) { logger.error("error", e); } } // 此時mysql中的數據所有恢復完,更新內存標識 offlineStorageManager.setOffline(false); } catch (Exception e) { logger.error("error", e); } } }
咱們在上面 其實已經有了一端代碼 是描述異步監聽消費結果的,這裏再來具體貼下 觀察者、被觀察者的代碼。
被觀察者
/** * 商品庫存更新結果觀察目標 * @author wangmeng * */ public class StockUpdateObservable extends Observable { /** * 消息id */ private String messageId; /** * 構造函數 * @param messageId 消息id */ public StockUpdateObservable(String messageId) { this.messageId = messageId; } /** * 設置商品庫存更新結果 * @param result 商品庫存更新結果 */ public void setResult(Boolean result) { StockUpdateResult goodsStockUpdateResult = new StockUpdateResult(); goodsStockUpdateResult.setMessageId(messageId); goodsStockUpdateResult.setResult(result); this.setChanged(); this.notifyObservers(goodsStockUpdateResult); } public String getMessageId() { return messageId; } }
觀察者
/** * 商品庫存更新結果觀察者 * @author wangmeng * */ @Component public class StockUpdateObserver implements Observer { private static final Logger logger = LoggerFactory.getLogger( StockUpdateObserver.class); /** * 通知異步處理結果 */ @Override public void update(Observable o, Object arg) { StockUpdateResult result = (StockUpdateResult) arg; logger.info("商品庫存更新消息[messageId=" + result.getMessageId() + "]" + "的異步處理結果爲:" + result.getResult()); } }
添加觀察者
observe方法是訂單中心通知庫存中心更新庫存的時候調用的,庫存中心給調度中心發送異步消息,而後將這個消息的messageId加入到觀察者中。
inform方法是調度中心的消息消費者調用的,若是消費成功,調度中心會調用inform方法,設置result=true
```java /** * 商品庫存更新結果管理組件 * @author wangmeng * */ @Component public class StockUpdateResultManagerImpl implements StockUpdateResultManager { /** * 商品庫存更新結果map */ private Map<String, StockUpdateObservable> observableMap = new ConcurrentHashMap<String, StockUpdateObservable>(); /** * 商品庫存更新結果觀察者 */ @Autowired private StockUpdateObserver observer; /** * 設置對商品庫存更新結果的觀察 * @param messageId 消息id * @param result 商品庫存更新結果 * @param observer 商品庫存更新結果的觀察者 */ @Override public void observe(String messageId) { StockUpdateObservable observable = new StockUpdateObservable(messageId); observable.addObserver(observer); observableMap.put(messageId, observable); } /** * 獲取商品庫存更新結果的觀察目標 * @param messageId 商品庫存更新消息id * @return 商品庫存更新結果的觀察目標 */ @Override public void inform(String messageId, Boolean result) { StockUpdateObservable observable = observableMap.get(messageId); observable.setResult(result); observableMap.remove(messageId); } /** * 獲取庫存更新結果觀察目標 * @param messageId 消息id * @return */ @Override public StockUpdateObservable getObservable(String messageId) { return observableMap.get(messageId); } } ```
本篇內容有點多,主要是分爲了三大塊,而後結合了中介者模式、備忘錄模式、觀察者模式。
其中在離線消息恢復的類中仍是用了迭代器模式。
代碼作了簡單的抽離,我相信讀起來仍是很輕鬆的,設計模式系列要先告一段落了,這幾篇文章涉及了 一些經常使用的設計模式,後面若是有新的模式還會繼續連載更新。
本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公衆號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小夥伴可關注我的公衆號:壹枝花算不算浪漫