初識ActiveMQ

  博主以前的一個高併發需求:Java併發(三):實例引出併發應用場景中所提到的,後來通過初步測試發現多線程並不能徹底知足需求,特別是性能上的需求,或者說多線程不是比較好的解決方案,真實需求是:將商品庫存(第三方數據庫上)"及時"通知第三方的網購平臺,達到同步商品餘量信息的目的,本地是存儲了相應的閾值,在第三方數據庫上的庫存一旦少於庫存,咱們就認爲這件商品已經售罄,由於要防止線上線下同一時間段銷售引發的庫存緊張,甚至訂單已經發出但庫存實際不足的狀況...以前多線程定時訪問庫存並同步數據顯然很是低效,主管老哥推薦我使用消息隊列來解決問題,頓時一臉懵,消息隊列是啥??html

消息隊列的基本概念:java

  消息隊列(Message queue)是一種進程間通訊或同一進程的不一樣線程間的通訊方式,軟件的貯列用來處理一系列的輸入,一般是來自用戶。消息隊列提供了異步的通訊協議,每個貯列中的紀錄包含詳細說明的數據,包含發生的時間,輸入設備的種類,以及特定的輸入參數,也就是說:消息的發送者和接收者不須要同時與消息隊列互交。消息會保存在隊列中,直到接收者取回它。 ——維基百科spring

 

博主使用的消息隊列中間件是ActiveMQ,爲何用它呢?數據庫

  1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WSNotification,XMPP,AMQP
  2. 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)
  3. 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性
  4. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  5. 支持經過JDBC和journal提供高速的消息持久化
  6. 從設計上保證了高性能的集羣,客戶端-服務器,點對點等等

以上的總結比較官方,歸納來講,ActiveMQ的優點在於它是Java語言開發,在基於Spring的項目上容易內嵌,很大程度的減小耦合,提供可靠的任務異步處理.apache

 

ActiveMQ的通訊模式:服務器

1.點對點(queue)session

  • 一個消息只能被一個服務接收
  • 消息一旦被消費,就會消失
  • 若是沒有被消費,就會一直等待,直到被消費
  • 多個服務監聽同一個消費空間,先到先得

2.發佈/訂閱模式(topic)多線程

  • 一個消息能夠被多個服務接收
  • 訂閱一個主題的消費者,只能消費自它訂閱以後發佈的消息
  • 消費端若是在生產端發送消息以後啓動,是接收不到消息的,除非生產端對消息進行了持久化(例如廣播,只有當時聽到的人能聽到信息)

 

如何實現?併發

業務需求是用發佈-訂閱模式完成,我負責消費者部分的代碼,一開始是這樣實現的,五步走:dom

  1. 經過鏈接工廠獲取鏈接對象
  2. 啓動鏈接
  3. 建立session
  4. 建立隊列或topic
  5. 註冊消息監聽

 

public class RoundRobinConfig1 {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Resource
    private InventoryService inventoryService;

    @Scheduled(cron = "0 53 * * * ?")//每2分鐘調度一次任務
    public void operation(){
        ConnectionFactory connectionFactory; // 鏈接工廠
        Connection connection = null; // 鏈接
        Session session; // 會話 接受或者發送消息的線程
        Destination destination; // 消息的目的地
        MessageConsumer consumer; //建立消費者

        // 實例化鏈接工廠
        connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);

        try {
            connection=connectionFactory.createConnection(); // 經過鏈接工廠獲取鏈接
            connection.start(); // 啓動鏈接
            /**
             * 這裏的最好使用Boolean.FALSE,若是是用true則必須commit才能生效,且http://127.0.0.1:8161/admin管理頁面纔會更新消息隊列的變化狀況。
             */
            session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session
//            destination=session.createQueue("FirstQueue1"); // 建立消息隊列
            destination=session.createTopic("firstTopic");
            consumer=session.createConsumer(destination);
            consumer.setMessageListener(new MyListener()); // 註冊消息監聽
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

具體業務邏輯寫在listener裏,你們使用時別忘了引入maven依賴

<!-- ActiveMQ -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.1</version>
        </dependency>

 

而後就進行初步測試,喜聞樂見地遇到問題了:

 

博主通過一通資料的查閱,依舊沒有搞懂問題所在,最後問同事要來了生產者的代碼,發現了問題可能出在這裏:

StompJmsConnectionFactory factory = new StompJmsConnectionFactory();

生產者用了這個鏈接工廠獲取鏈接,隨即百度了一下Stomp,瞭解到這實際上是一種消息格式協議,另外還有AMQP,OPENWIRE,MQTT等,幾種消息協議的概述能夠戳我,我便換成了StompJmsConnection對象來獲取鏈接,結果成功獲取到消息體:

 

因爲須要讓訂閱消息隊列的程序一直運行,我採起官方推薦的死循環方式處理,而且使其在模塊啓動時運行,後來考慮了一下,萬一死循環出現異常,那整個模塊不就宕了嗎,因而我給模塊建立了一個子進程用來輪詢消息隊列,這樣子進程就算掛了,整個模塊也不受影響了:

import com.google.gson.Gson;
import com.ycyz.framework.task.domain.Inventorycache;
import com.ycyz.framework.task.service.InventorycacheService;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.stomp.jms.StompJmsConnectionFactory;
import org.fusesource.stomp.jms.StompJmsDestination;
import org.fusesource.stomp.jms.message.StompJmsBytesMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.jms.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author YHW
 * @ClassName: AutoRunner
 * @Description:
 * @date 2019/1/7 8:27
 */

@Order(value = 1)
@Component
public class AutoRunner implements ApplicationRunner {

    private Logger logger = LoggerFactory.getLogger(getClass());

    Map map = new HashMap(16);
    String result = null;
    String user = env("ACTIVEMQ_USER", "");
    String password = env("ACTIVEMQ_PASSWORD", "");
    String host = env("ACTIVEMQ_HOST", "域名");
    String destination = "/topic/bn.stock.prod";
    int port = Integer.parseInt(env("ACTIVEMQ_PORT", "端口號"));
    Destination dest = new StompJmsDestination(destination);

    @Resource
    private InventorycacheService inventorycacheService;

    Gson gson = new Gson();

    StompJmsConnectionFactory factory = new StompJmsConnectionFactory();

    @Override
    public void run(ApplicationArguments args) throws Exception{
        logger.info("開始運行了...");
        Thread thread = new Thread(){
            @Override
            public void run() {
                MessageConsumer consumer = null;
                Connection connection = null;
                long start = 0L;
                long count = 0L;
                try {
                    factory.setBrokerURI("tcp://" + host + ":" + port);
                    connection = factory.createConnection(user, password);
                    connection.start();
                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    consumer = session.createConsumer(dest);
                    start = System.currentTimeMillis();
                    count = 1;
                    System.out.println("Waiting for messages...");

                    while (true) {
                        System.out.println("輪詢消息隊列..");
                        Message message = null;
                        try {
                            message = consumer.receive();
                            if (message instanceof StompJmsBytesMessage) {
                                StompJmsBytesMessage sm = (StompJmsBytesMessage) message;
                                Buffer buffer = sm.getContent();
                                byte[] a = buffer.getData();
                                result = new String(a);if (result.contains("SHUTDOWN")) {
                                    long diff = System.currentTimeMillis() - start;
                                    System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
                                    break;
                                }
                                //result是獲取到的消息字符串,這裏開始處理它
                            }
                        }catch(Exception e) {
                            e.printStackTrace();
                            continue;
                        }
                    }
                    connection.close();
                }catch(JMSException e) {
                    e.printStackTrace();
                }
            }
        };
        thread.start();

    }

    private static String env(String key, String defaultValue) {
        String rc = System.getenv(key);
        if( rc== null ){
            return defaultValue;
        }
        return rc;
    }

    private static void flagTrigger(Inventorycache inventorycache){
        if(new Integer(1).equals(inventorycache.getFlag())){
            inventorycache.setFlag(0);
        }else{
            inventorycache.setFlag(1);
        }
    }

    private static String getResult(String theWholeMessage){
        int startFlag = 0;
        int endFlag = 0;
        for (int i = 0; i < theWholeMessage.length(); i++) {
            if (theWholeMessage.charAt(i) == '{') {
                startFlag = i;
            } else if (theWholeMessage.charAt(i) == '}') {
                endFlag = i;
            }
        }
        return theWholeMessage.substring(startFlag, endFlag + 1);
    }
}

這樣就初步完成,生產者只管往隊列裏"塞"待處理消息,消費者只管"拿"消息來處理,作到了有效的應用程序解耦.

固然我也不肯定還有沒有更好的方案,博主才疏學淺,懂得太少,但願有看到的大牛可以不吝賜教,謝謝了

相關文章
相關標籤/搜索