博主以前的一個高併發需求:Java併發(三):實例引出併發應用場景中所提到的,後來通過初步測試發現多線程並不能徹底知足需求,特別是性能上的需求,或者說多線程不是比較好的解決方案,真實需求是:將商品庫存(第三方數據庫上)"及時"通知第三方的網購平臺,達到同步商品餘量信息的目的,本地是存儲了相應的閾值,在第三方數據庫上的庫存一旦少於庫存,咱們就認爲這件商品已經售罄,由於要防止線上線下同一時間段銷售引發的庫存緊張,甚至訂單已經發出但庫存實際不足的狀況...以前多線程定時訪問庫存並同步數據顯然很是低效,主管老哥推薦我使用消息隊列來解決問題,頓時一臉懵,消息隊列是啥??html
消息隊列的基本概念:java
消息隊列(Message queue)是一種進程間通訊或同一進程的不一樣線程間的通訊方式,軟件的貯列用來處理一系列的輸入,一般是來自用戶。消息隊列提供了異步的通訊協議,每個貯列中的紀錄包含詳細說明的數據,包含發生的時間,輸入設備的種類,以及特定的輸入參數,也就是說:消息的發送者和接收者不須要同時與消息隊列互交。消息會保存在隊列中,直到接收者取回它。 ——維基百科spring
博主使用的消息隊列中間件是ActiveMQ,爲何用它呢?數據庫
以上的總結比較官方,歸納來講,ActiveMQ的優點在於它是Java語言開發,在基於Spring的項目上容易內嵌,很大程度的減小耦合,提供可靠的任務異步處理.apache
ActiveMQ的通訊模式:服務器
1.點對點(queue)session
2.發佈/訂閱模式(topic)多線程
如何實現?併發
業務需求是用發佈-訂閱模式完成,我負責消費者部分的代碼,一開始是這樣實現的,五步走:dom
public class RoundRobinConfig1 { private Logger logger = LoggerFactory.getLogger(getClass()); @Resource private InventoryService inventoryService; @Scheduled(cron = "0 53 * * * ?")//每2分鐘調度一次任務 public void operation(){ ConnectionFactory connectionFactory; // 鏈接工廠 Connection connection = null; // 鏈接 Session session; // 會話 接受或者發送消息的線程 Destination destination; // 消息的目的地 MessageConsumer consumer; //建立消費者 // 實例化鏈接工廠 connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { connection=connectionFactory.createConnection(); // 經過鏈接工廠獲取鏈接 connection.start(); // 啓動鏈接 /** * 這裏的最好使用Boolean.FALSE,若是是用true則必須commit才能生效,且http://127.0.0.1:8161/admin管理頁面纔會更新消息隊列的變化狀況。 */ session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session // destination=session.createQueue("FirstQueue1"); // 建立消息隊列 destination=session.createTopic("firstTopic"); consumer=session.createConsumer(destination); consumer.setMessageListener(new MyListener()); // 註冊消息監聽 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
具體業務邏輯寫在listener裏,你們使用時別忘了引入maven依賴
<!-- ActiveMQ --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.1</version> </dependency>
而後就進行初步測試,喜聞樂見地遇到問題了:
博主通過一通資料的查閱,依舊沒有搞懂問題所在,最後問同事要來了生產者的代碼,發現了問題可能出在這裏:
StompJmsConnectionFactory factory = new StompJmsConnectionFactory();
生產者用了這個鏈接工廠獲取鏈接,隨即百度了一下Stomp,瞭解到這實際上是一種消息格式協議,另外還有AMQP,OPENWIRE,MQTT等,幾種消息協議的概述能夠戳我,我便換成了StompJmsConnection對象來獲取鏈接,結果成功獲取到消息體:
因爲須要讓訂閱消息隊列的程序一直運行,我採起官方推薦的死循環方式處理,而且使其在模塊啓動時運行,後來考慮了一下,萬一死循環出現異常,那整個模塊不就宕了嗎,因而我給模塊建立了一個子進程用來輪詢消息隊列,這樣子進程就算掛了,整個模塊也不受影響了:
import com.google.gson.Gson; import com.ycyz.framework.task.domain.Inventorycache; import com.ycyz.framework.task.service.InventorycacheService; import org.fusesource.hawtbuf.Buffer; import org.fusesource.stomp.jms.StompJmsConnectionFactory; import org.fusesource.stomp.jms.StompJmsDestination; import org.fusesource.stomp.jms.message.StompJmsBytesMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.jms.*; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author YHW * @ClassName: AutoRunner * @Description: * @date 2019/1/7 8:27 */ @Order(value = 1) @Component public class AutoRunner implements ApplicationRunner { private Logger logger = LoggerFactory.getLogger(getClass()); Map map = new HashMap(16); String result = null; String user = env("ACTIVEMQ_USER", ""); String password = env("ACTIVEMQ_PASSWORD", ""); String host = env("ACTIVEMQ_HOST", "域名"); String destination = "/topic/bn.stock.prod"; int port = Integer.parseInt(env("ACTIVEMQ_PORT", "端口號")); Destination dest = new StompJmsDestination(destination); @Resource private InventorycacheService inventorycacheService; Gson gson = new Gson(); StompJmsConnectionFactory factory = new StompJmsConnectionFactory(); @Override public void run(ApplicationArguments args) throws Exception{ logger.info("開始運行了..."); Thread thread = new Thread(){ @Override public void run() { MessageConsumer consumer = null; Connection connection = null; long start = 0L; long count = 0L; try { factory.setBrokerURI("tcp://" + host + ":" + port); connection = factory.createConnection(user, password); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(dest); start = System.currentTimeMillis(); count = 1; System.out.println("Waiting for messages..."); while (true) { System.out.println("輪詢消息隊列.."); Message message = null; try { message = consumer.receive(); if (message instanceof StompJmsBytesMessage) { StompJmsBytesMessage sm = (StompJmsBytesMessage) message; Buffer buffer = sm.getContent(); byte[] a = buffer.getData(); result = new String(a);if (result.contains("SHUTDOWN")) { long diff = System.currentTimeMillis() - start; System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0))); break; } //result是獲取到的消息字符串,這裏開始處理它 } }catch(Exception e) { e.printStackTrace(); continue; } } connection.close(); }catch(JMSException e) { e.printStackTrace(); } } }; thread.start(); } private static String env(String key, String defaultValue) { String rc = System.getenv(key); if( rc== null ){ return defaultValue; } return rc; } private static void flagTrigger(Inventorycache inventorycache){ if(new Integer(1).equals(inventorycache.getFlag())){ inventorycache.setFlag(0); }else{ inventorycache.setFlag(1); } } private static String getResult(String theWholeMessage){ int startFlag = 0; int endFlag = 0; for (int i = 0; i < theWholeMessage.length(); i++) { if (theWholeMessage.charAt(i) == '{') { startFlag = i; } else if (theWholeMessage.charAt(i) == '}') { endFlag = i; } } return theWholeMessage.substring(startFlag, endFlag + 1); } }
這樣就初步完成,生產者只管往隊列裏"塞"待處理消息,消費者只管"拿"消息來處理,作到了有效的應用程序解耦.
固然我也不肯定還有沒有更好的方案,博主才疏學淺,懂得太少,但願有看到的大牛可以不吝賜教,謝謝了