Spring Boot-實現Apache ActiveMQ消息中間件

前言

在分佈式系統中消息通訊技術主要包括如下幾種:html

  • RPC(Remote Procedure Call Protocol):通常是C/S方式,同步的,跨語言跨平臺,面向過程。
  • CORBA(Common Object Request Broker Architecture):CORBA從概念上擴展了RPC。面向對象的,企業級的(面向對象中間件還有DCOM)。
  • RMI(Remote Method Invocation):面向對象方式的 Java RPC。
  • WebService:基於Web,C/S或B/S,跨系統跨平臺跨網絡。多爲同步調用, 實時性要求較高
  • MOM(Message oriented Middleware):面向消息中間件,主要適用於消息通道、消息總線、消息路由和發佈/訂閱的場景。目前主流標準有JMS(Java Message Service)、AMQP(Advanced Message Queuing Protocol)和STOMP(Streaming Text Oriented Messaging Protocol)。其中JMS是Java平臺上的面向接口的消息規範,是一套API標準,並有考慮異構系統。AMQP是一個面向協議的,跟語言平臺無關的消息傳遞應用層協議規範。STOMP是流文本定向消息協議,是一種爲MOM設計的簡單文本協議,在使用websocket通訊時可使用該協議來中繼消息中間件功能。AMQP和STOMP都是跟http處於同一層的協議,在 AMQP 模型中,消息的 producer 將 Message 發送給 Exchange,Exchange 負責交換/路由,將消息正確地轉發給相應的 Queue。消息的 Consumer 從 Queue 中讀取消息。

這些通訊方式中MOM消息中間件通常是分佈式系統中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構,是大型分佈式系統不可缺乏的中間件。目前在生產環境,使用較多的消息隊列有ActiveMQRabbitMQZeroMQKafkaRocketMQ 等,本文介紹其中的一種ActiveMQ消息中間件來展開消息中間件的使用過程。
圖片描述java


What-什麼是ActiveMQ

ActiveMQ是一種開源的,實現了JMS規範的,面向消息(MOM)的中間件,爲應用程序提供高效的、可擴展的、穩定的和安全的企業級消息通訊。它能夠部署於代理模式和P2P模式。徹底支持JMS1.1和J2EE 1.4規範。跨平臺的,多種語言和協議編寫客戶端,Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire, Stomp REST, WS Notification, XMPP, AMQP。如需配置ActiveMQ則須要在目標機器上安裝Java環境。支持集羣,同等網絡,自動檢測,TCP,SSL,廣播,持久化,XA,多個消息也能夠組成原子事務。
受權協議: Apache
開發語言: Java
操做系統: 跨平臺linux


Why-爲何要用ActiveMQ/消息中間件

要理解這個問題得從一個實際項目業務提及,舉個例子。
圖片描述web

業務場景:某財務系統A須要給某甲方公司開發合併報表業務,其中A系統的合併報表業務的基礎是須要憑證做爲基準源數據(A系統不生產憑證,只是憑證的搬運工)。可是甲方公司歷史已存在一套憑證系統B專門作憑證業務(B系統歷史憑證數量大概1000萬,同時天天會產生2萬數量的新憑證),外加合併報表業務處理的實時性不是很是嚴格,通常一個月出一次報表便可。所以就衍生出以下業務需求。
業務需求描述:B系統須要把歷史存在的憑證數據和天天新產生的憑證都須要推送給A系統,而後A系統根據傳過來的憑證作憑證的後續合併報表業務處理。
系統方案設計:爲了解決該業務需求,A系統按約定開發了一套基於WebService技術的接口服務來專門負責接收B系統的憑證數據,A系統接收憑證後須要作憑證數據合法性校驗、重複數據校驗、保存等及一系列業務處理。
初步處理方案:A系統針對每一個B系統傳來的接口請求線程的處理都包括憑證校驗、保存等處理都採用同步通訊處理完後即時反饋處理結果給B系統的。這種方式的處理形成單個請求線程處理耗時較長,平均算下來基本1條數據的處理大體得花費1秒時間。而這種處理方式生產過程當中存在很是大的性能問題,在B系統海量數據推送的狀況下,按1秒1條的話,B系統當天推送的憑證A系統當天來不及處理完,這樣就會堆積在次日繼續處理,可是B系統次日又會傳新的一批大量憑證,這樣就致使A系統處理的仍是第一天B系統傳輸的憑證數據,次日的數據一直壓着消費不了,時間一久系統的業務可用性大大下降。因此該方案須要廢棄優化。
優化處理方案:B系統每一個接口請求線程同步推送值A接口服務中,A系統接收後即時將消息轉發至消息中間件ActiveMQ的隊列中而後結束本次接口調用。系統A在內部單獨提供消息中間件的多個消費者來異步消費中間件隊列裏面的憑證數據,每消費一條消息就校驗保存操做,其中不知足業務的記錄將錯誤系統經過與B系統約定的另外一個接口將處理成功、處理錯誤信息反饋結果推送到B接口服務中,保存後的數據再經過計劃任務或者手工觸發來異步作合併業務處理,這樣再通過一系列細節優化後,最終實現了業務處理的高可用性。spring


經過上述例子咱們能夠看出消息中間件在處理業務起到了很大的做用,其做用取決於其擁有的特色,對於傳統的通訊通常分爲同步通訊(好比:消息中間件)和異步通訊(好比:RPC)。對於現在的分佈式系統,消息隊列已經演變爲獨立的消息中間件產品,相比於RPC同步通訊的方式來講有幾個明顯的優點:apache

  • 低耦合:不論是程序仍是模塊之間,使用消息中間件進行間接通訊。
  • 消息的順序性:消息隊列能夠保證消息的先進先出。
  • 消息可靠傳輸:持久化的存儲使得消息只有在被消費以後纔會刪除。
  • 異步通訊能力:相對於RPC來講,異步通訊使得生產者和消費者得以充分執行本身的邏輯而無需等待。
  • 緩衝/堆積能力:消息中間件像是一個巨大的蓄水池,將高峯期大量的請求存儲下來慢慢交給後臺進行處理,對於秒殺業務來講尤其重要,好比kafka具備強大的堆積能力能夠用於日誌通訊中間件。

可是異步通訊也存在程序設計和編程方面的複雜,同時對於實時性要求較高的業務也不能採用異步通訊,因此要根據業務具體分析。編程


消息中間件ActiveMQ的JMS支持兩種消息傳送模型:點對點消息通訊模型發佈訂閱模型圖片描述vim

  • 點對點(PTP)消息通訊模型:也可稱之爲隊列Queue模式,特定的一條消息只能被一個消費者消費。生產者將消息發送到指定的Queue當中,Broker(中間件)針對消息是否須要持久化進行持久化存儲後通知消費者進行處理,消費者處理完畢後發送一個回執(Acknowledge)給Broker,Broker認爲該消息已被正常消費,因而從持久化存儲中刪除該條消息,回執的發送邏輯內嵌在MQ的API中,無需主動調用。消費者一般能夠經過兩種方式獲取新消息:Push和Pull。Push方式:由ActiveMQ收到消息後主動調用消費者的新消息通知接口,須要消耗ActiveMQ寶貴的線程資源,同時消費者只能被動等待消息通知。Pull方式:由消費者輪詢調用 ActiveMQ API 去獲取消息,不消耗ActiveMQ 線程,消費者更加主動,雖然消費者的處理邏輯變得稍稍複雜。兩種方式的根本區別在於線程消耗問題,因爲ActiveMQ 的線程資源相對客戶端更加寶貴,Push方式會佔用ActiveMQ 過多的線程從而難以適應高併發的消息場景。同時當某一消費者離線一段時間再次上線後,大量積壓消息處理會消耗大量ActiveMQ 線程從而拖累其它消費者的消息處理,因此Pull方式相對來講更好(Kafka消息中間件已經拋棄了PUSH模式,全面擁抱PULL模式)。圖片描述
  • 發佈/訂閱模式(Pub/Sub):也可稱之爲主題Topic模式,特定的一條消息能夠被多個消費者所接收,只要消費者訂閱了某個主題。消息生產者(發佈者)將消息發送到某個稱爲主題(Topic)的虛擬通道中,Topic能夠被多個消費者訂閱,所以該模式相似於廣播的方式。發佈/訂閱模式採用PUSH的方式傳送消息,Subscriber只需保持在線便可。Subscriber分爲臨時性的和持久性的,當訂閱者離線時,ActiveMQ 會爲持久性的Sub持久化消息,當Sub恢復時會從新收到消息。可是既然採用Pub/Sub模式就代表容許部分消費者接收不到消息,因此一般會採用臨時性的Subscriber而不是持久性的。圖片描述

兩種模式選擇可在具體業務場景下選擇合適的模式來開發,業務要求實時性很是高的不建議使用消息中間件,一切以實際業務場景出發,避免亂使用形成數據紊亂引起騷動哈哈。
圖片描述後端


How-如何使用ActiveMQ

ActiveMQ依賴於Java,因此須要先安裝JDK,而後在安裝ActiveMQ消息中間件,而後編寫代碼實現。瀏覽器


JDK10安裝

下載JDK10

官網地址:http://www.oracle.com/technet...
圖片描述

安裝jdk10

將下載的jdk-10_linux-x64_bin.tar.gz文件拷貝到/usr/local/src目錄.
圖片描述

解壓文件到/usr/local/bin,先進入解壓目錄下cd /usr/local/bin,而後使用命令 tar -zxvf /usr/local/src/jdk-10_linux-x64_bin.tar.gz 執行解壓。
圖片描述
圖片描述

配置JAVA_HOME環境變量。
使用命令:vim /etc/profile 編輯配置文件,在vim中插入數據按鍵盤上的i或者insert,而後添加以下JAVA_HOME內容:

export JAVA_HOME=/usr/local/bin/jdk-10
export CLASSPATH=$JAVA_HOME/lib/
export PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH

圖片描述

按esc退出insert模式,再按:輸入wq,保存而且退出文件編輯。
圖片描述

重啓服務器或者執行配置當即生效命令,刷新配置文件生效命令(我用的這個方法):source /etc/profile ,或者執行重啓服務器命令:sudo shutdown -r now ,而後執行java -version驗證安裝是否成功。
圖片描述
出現版本號即表示安裝成功,JDK安裝成功後接下來安裝ActiveMQ。


ActiveMQ安裝

下載ActiveMQ

官網地址: http://activemq.apache.org/
圖片描述

安裝ActiveMQ

將下載的apache-activemq-5.15.3-bin.tar.gz文件拷貝到/usr/local/src目錄。
圖片描述

解壓文件到/usr/local/bin ,先進入解壓目錄下cd /usr/local/bin,而後使用命令 tar -zxvf /usr/local/src/apache-activemq-5.15.3-bin.tar.gz 執行解壓。
圖片描述

而後進入cd /usr/local/bin/apache-activemq-5.15.3/bin目錄下執行使用後端啓動模式命令: ./activemq start 運行activemq,執行後使用命令 ps -ef | grep -i activemq 查看進程。
圖片描述

啓動後能夠看到activemq默認使用了8161(監控平臺)和61616(tcp通訊服務)端口。
圖片描述

默認端口配置能夠本身修改,修改ActiveMQ的tcp通訊服務端口,修改服務地址和端口:打開conf/activemq.xml文件,找到以下部分,修改紅色部分便可: 
圖片描述

修改監控平臺地址和端口:打開conf/jetty.xml文件,找到以下部分,修改紅色部分便可: 
圖片描述

進入目錄cd /etc/sysconfig 執行命令 vim iptables 開啓8161, 61616防火牆端口。
圖片描述

而後刷新服務。
圖片描述

因爲我使用的是阿里雲的服務器部署的,因此須要登錄ECS雲服務器上配置安全組策略配置8161, 61616入口規則才能在外網訪問。
圖片描述
圖片描述

配置完後使用瀏覽器而後訪問http://47.93.63.64:8161/admin/進入管理平臺,默認登錄用戶名和密碼都爲admin。
圖片描述

配置了域名解析後也可使用域名訪問.
圖片描述
圖片描述

通過以上部署後準備單點服務的簡單版工做已作完,下面編寫代碼實現消息的生產和消費功能。


ActiveMQ代碼開發

原始的 ActiveMQ API 處理流程圖:
圖片描述

因爲Spring Boot提供了大量的自動配置和註解功能,已經把這部分代碼封裝好了,因此開發起來很方便。

工程添加ActiveMQ依賴

Gradle構建工具依賴添加:

// 集成Active MQ消息中間件
compile group: 'org.springframework.boot', name: 'spring-boot-starter-activemq', version: '2.0.0.RELEASE'

圖片描述


  • 編寫application.properties屬性配置
##################################---Active MQ消息中間件---##############################################
spring.activemq.broker-url=tcp://www.javalsj.com:61616
spring.activemq.user=admin
spring.activemq.password=admin

  • 編寫ActiveMQConfiguration配置類代碼:
package com.javalsj.blog.activemq;

import javax.jms.ConnectionFactory;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;

/**
 * @description ActiveMQ消息隊列配置類
 * @author WANGJIHONG
 * @date 2018年3月25日 下午10:52:26 
 * @Copyright 版權全部 (c) www.javalsj.com
 * @memo 無備註說明
 */
@Configuration
public class ActiveMQConfiguration {
    
    /** 
     * 在Queue模式中,對消息的監聽須要對containerFactory進行配置
     */ 
    @Bean(ActiveMQQueueConst.BEAN_NAME_JMSLISTENERCONTAINERFACTORY)
    public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }
    
    /** 
     * 在Topic模式中,對消息的監聽須要對containerFactory進行配置
     */ 
    @Bean(ActiveMQTopicConst.BEAN_NAME_JMSLISTENERCONTAINERFACTORY)
    public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }
}

  • 編寫ActiveMQQueueConst隊列常量類代碼:
package com.javalsj.blog.activemq;

/**
 * @description ActiveMQ隊列常量
 * @author WANGJIHONG
 * @date 2018年3月25日 下午10:59:47
 * @Copyright 版權全部 (c) www.javalsj.com
 * @memo 無備註說明
 */
public class ActiveMQQueueConst {

    /** 
     * 在Queue模式中,對消息的監聽須要對containerFactory進行配置,工廠標識
     */ 
    public static final String BEAN_NAME_JMSLISTENERCONTAINERFACTORY = "queueJmsListenerContainerFactory";    
    
    /**
     * 隊列消息標識_WebSocket的Java老司機聊天室
     */
    public static final String QUEUE_NAME_WEBSOCKET_CHATROOM_JAVALSJ = "queue.websocket.chatroom.javalsj";

}

  • 編寫ActiveMQQueueProducer隊列生產者代碼:
package com.javalsj.blog.activemq;

import javax.jms.Destination;

import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

/**
 * @description ActiveMQ消息生產者
 * @author WANGJIHONG
 * @date 2018年3月25日 下午10:57:54
 * @Copyright 版權全部 (c) www.javalsj.com
 * @memo 無備註說明
 */
@Component
public class ActiveMQQueueProducer {
    
    private final static Logger logger = LoggerFactory.getLogger(ActiveMQQueueProducer.class);

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    /** 
     * 發送隊列消息
     * @param destinationName 消息目的地標識
     * @param message 消息文本
     */ 
    public void sendMsg(String destinationName, String message) {
        logger.info("發佈了一條隊列{}消息{}。", destinationName, message);
        Destination destination = new ActiveMQQueue(destinationName);
        jmsMessagingTemplate.convertAndSend(destination, message);
    }
    
}

  • 編寫ActiveMQQueueConsumer隊列消費者代碼:
package com.javalsj.blog.activemq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * @description ActiveMQ隊列消息消費者
 * @author WANGJIHONG
 * @date 2018年3月25日 下午10:59:10
 * @Copyright 版權全部 (c) www.javalsj.com
 * @memo 無備註說明
 */
@Component
public class ActiveMQQueueConsumer {

    private final static Logger logger = LoggerFactory.getLogger(ActiveMQQueueConsumer.class);

    /**
     * WebSocket的Java老司機聊天室隊列消息消費者
     */
    @JmsListener(destination = ActiveMQQueueConst.QUEUE_NAME_WEBSOCKET_CHATROOM_JAVALSJ, containerFactory = ActiveMQQueueConst.BEAN_NAME_JMSLISTENERCONTAINERFACTORY)
    public void receiveQueueWebSocketJavalsjChatroomMsg(String message) {
        logger.info("消費了一條隊列{}消息{}。", ActiveMQQueueConst.QUEUE_NAME_WEBSOCKET_CHATROOM_JAVALSJ, message);
    }

}

  • 編寫ActiveMQTopicConst主題消息常量類
package com.javalsj.blog.activemq;

/**
 * @description ActiveMQ主題常量 
 * @author WANGJIHONG
 * @date 2018年3月25日 下午11:24:09 
 * @Copyright 版權全部 (c) www.javalsj.com
 * @memo 無備註說明
 */
public class ActiveMQTopicConst {
    
    /** 
     * 在Topic模式中,對消息的監聽須要對containerFactory進行配置,工廠標識
     */ 
    public static final String BEAN_NAME_JMSLISTENERCONTAINERFACTORY = "topicJmsListenerContainerFactory";    
    
    /**
     * 主題消息標識_WebSocket的系統公告
     */
    public static final String TOPIC_NAME_WEBSOCKET_SYSTEM_NOTICE = "topic.websocket.system.notice";
}

  • 編寫ActiveMQTopicPublisher主題消息發佈者代碼:
package com.javalsj.blog.activemq;

import javax.jms.Destination;

import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

/**
 * @description ActiveMQ主題消息發佈者
 * @author WANGJIHONG
 * @date 2018年3月25日 下午11:19:45 
 * @Copyright 版權全部 (c) www.javalsj.com
 * @memo 無備註說明
 */
@Component
public class ActiveMQTopicPublisher {
    private final static Logger logger = LoggerFactory.getLogger(ActiveMQTopicPublisher.class);

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    /** 
     * 發佈主題消息
     */
    public void publishMsg(String destinationName, String message) {
        logger.info("發佈了一條主題{}消息{}。", destinationName, message);
        Destination destination = new ActiveMQTopic(destinationName);
        jmsMessagingTemplate.convertAndSend(destination, message);
    }
}

  • 編寫ActiveMQTopicSubscriber主題消息訂閱者代碼:
package com.javalsj.blog.activemq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * @description ActiveMQ主題消息訂閱者
 * @author WANGJIHONG
 * @date 2018年3月25日 下午11:22:50
 * @Copyright 版權全部 (c) www.javalsj.com
 * @memo 無備註說明
 */
@Component
public class ActiveMQTopicSubscriber {

    private final static Logger logger = LoggerFactory.getLogger(ActiveMQTopicSubscriber.class);

    @JmsListener(destination = ActiveMQTopicConst.TOPIC_NAME_WEBSOCKET_SYSTEM_NOTICE, containerFactory = ActiveMQTopicConst.BEAN_NAME_JMSLISTENERCONTAINERFACTORY)
    public void subscribeTopicWebsocketSystemNoticeMsg(String message) {
        logger.info("消費了一條主題{}消息{}。", ActiveMQTopicConst.TOPIC_NAME_WEBSOCKET_SYSTEM_NOTICE, message);
    }
}

  • 編寫ActiveMQTest測試類
package com.javalsj.blog.activemq;

import java.time.Instant;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * @description 測試
 * @author WANGJIHONG
 * @date 2018年3月25日 下午11:41:03
 * @Copyright 版權全部 (c) www.javalsj.com
 * @memo 無備註說明 
 */
@Component
@EnableScheduling
public class ActiveMQTest {

    @Autowired
    private ActiveMQQueueProducer activeMQQueueProducer;

    @Autowired
    private ActiveMQTopicPublisher activeMQTopicPublisher;

    @Scheduled(fixedRate = 10000, initialDelay = 3000)
    public void test() {
        activeMQQueueProducer.sendMsg(ActiveMQQueueConst.QUEUE_NAME_WEBSOCKET_CHATROOM_JAVALSJ,
                "隊列message" + Instant.now().toString());
        activeMQTopicPublisher.publishMsg(ActiveMQTopicConst.TOPIC_NAME_WEBSOCKET_SYSTEM_NOTICE,
                "主題message" + Instant.now().toString());
    }

}

重啓服務驗證功能使用狀況,至此簡單的ActiveMQ實例demo已完成。
圖片描述


總結

本文只是簡單的介紹了下Apache ActiveMQ消息中間件簡單使用,實際生產環境開發比這要複雜些,好比爲了高可用性通常都須要把 ActiveMQ 部署爲集羣,其中消費過程也須要作業務上的去重等等一系列細節處理,感興趣的能夠網上查查資料瞭解下高可用消息中間件的部署和使用。也能夠了解下其餘類型的消息中間件產品,每種都有合適的適用場景。寫了兩個小時,困了,幫我關下燈,嘿嘿。
圖片描述

相關文章
相關標籤/搜索