手把手教你如何玩轉消息中間件(ActiveMQ)

手把手教你如何玩轉消息中間件(ActiveMQ)

 版權聲明:本文爲博主原創文章,未經博主容許不得轉載。 https://blog.csdn.net/Cs_hnu_scw/article/details/81040834

#情景引入
小白:起牀起牀起牀起牀。。。。快起牀~
我:怎麼了又,大驚小怪,嚇到我了。
小白:我有事有事想找你,十萬火急呢~~
我:你能有什麼事?反正我不信。。那你說說看~~
小白:就是我有兩個小表弟,叫大白和二白,他們如今天天睡覺以前都要分別和我聊天,讓我給他們講故事,若是不講他們就不睡覺。可是,若是一個個的跟他們輪流來講的話,我就須要天天說兩遍,並且我還要找準他們的時間點,這個有時候我有事情都沒法實現這個問題,他們就會很生氣。。。
我:這不是挺好的嘛,小孩子就是愛聽故事的呀。。。
小白:我也願意講,可是時間這個不是很好控制,有沒有相似,好比我能夠以前就描述好了,而後定點給他們兩個一塊兒發消息,而能夠拋開時間和其餘因素的影響呢?
我:這個嘛,很簡單呀,你可讓他們關注你的一個公衆號,這樣你再定時的推送給他們故事不就能夠了嘛。。或者,你能夠拉他們進你的一個羣這樣,就方便了呀~
小白:這樣是能夠,可是若是之後還有小表妹要聽我講,我就要如此反覆的作。。感謝好麻煩好麻煩。。。
我:emmm,我理解你的意思,你就想實現一種不少人都可以進行相似一種消息推送的方式嘛。。。
小白:對的對的。。就是這樣一種,,,我記得咱們在技術方面好像也有一種相似的技術,這個叫作什麼去了呢?
我:這就是消息中間件,一種生產者和消費者的關係。
小白:我也想學我也想學,,你快給我講講,給我講講唄。。
我:真拿你沒辦法,好吧。。。下面我就給你講一下這方面的知識。
#情景分析
其實,小白的這個問題,是一種比較廣泛的問題。既然咱們做爲技術人員,固然咱們就要從技術成分去分析如何解決了。這裏面其實就是包含着一種消息中間件的技術。它也是最近技術層面用得很是很是多的,這也是很是值得咱們進行學習。。這在現在的秒殺系統,推薦系統等等,都有普遍的應用。。因此,這章我就主要來跟你們說說這方面的知識。
#基本概念的引導
本模塊主要講解關於消息中間件的相關基礎知識,也是方便咱們後面的學習。
###什麼是中間件?
非操做系統軟件,非業務應用軟件,不是直接給最終用戶使用,不能直接給用戶帶來價值的軟件,咱們就能夠稱爲中間件(好比Dubbo,Tomcat,Jetty,Jboss都是屬於的)。
###什麼是消息中間件?
百度百科解釋:消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。
關鍵點:關注於數據的發送和接受,利用高效可靠的異步消息機制傳遞機制集成分佈式系統。
先簡單的用下面這個圖說明:
這裏寫圖片描述
###爲何要使用消息中間件
舉幾個例子,我想你就會明白了。(其實使用消息中間件主要就是爲了解耦合和異步兩個做用)
1:微博,都用過吧。那麼,當咱們新關注一個用戶,那麼系統會相應的推送消息給咱們,而且還作了不少關於咱們關注的處理。這就是消息中間件的異步。
2:秒殺系統。100件商品,幾十萬我的在搶,那這個怎麼弄呢?總不能就把服務器給宕機了吧。那麼就能夠把用戶的請求進行緩存,而後再異步處理。
3:系統A給系統B進行通訊,而系統B須要對A的消息進行相應處理以後才能給A反饋,這時候,總不能讓A就傻傻等着吧。那麼,這就是異步的功能。
###什麼是JMS?
Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。
總結起來講就是:Java對於應用程序之間進行信息交互的API(並且是異步)。
裏面有下面的概念須要理解,對後續有幫助:html

  • 提供者:實現JMS的消息服務中間件服務器。java

  • 客戶端:發送或接受消息的應用。web

  • 生產者/發佈者:建立併發送消息的客戶端。spring

  • 消費者/訂閱者:接受並處理消息的客戶端。sql

  • 消息:應用程序之間傳遞的數據。apache

  • 消息模式:在客戶端之間傳遞消息的模式,JMS主要是隊列模式和主體模式。windows

  • 隊列模式特色:
    (1)客戶端包括生產者和消費者。
    (2)隊列中的一個消息只能被一個消費者使用。
    (3)消費者能夠隨時取消息。瀏覽器

  • 主體模式特色:
    (1)客戶端包括髮布者和訂閱者。
    (2)主題中的消息能夠被全部訂閱者消費。
    (3)消費者不能消費訂閱以前發送的消息。
    ###什麼是AMQP?
    AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不一樣產品,不一樣的開發語言等條件的限制。
    簡單點說:就是對於消息中間件所接受的消息傳輸層的協議(不懂傳輸層,那麼就須要多看看計算機網絡相關知識了,OSI的層次劃分),只有這樣才能保證客戶端和消息中間件可以進行交互(換位思考:HTTP和HTTPS甚至說是TCP/IP與UDP協議都要的道理)。
    emmm,比較一下JMS和AMQP的不一樣吧。。緩存

  • JMS是定義與Java,而AMQP是一種傳輸層協議。tomcat

  • JMS是屬於Java的API,而AMQP是跨語言的。

  • JMS消息類型只有兩種(主題和隊列,後續會說),而AMQP是有五種。

  • JMS主要就是針對Java的開發的Client,而AMQP是面向消息,隊列,路由。
    ###什麼是ActiveMQ呢?
    ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。
    簡單點說:不就是爲了實現我上述所想要的需求嘛。而後它就是一種實現的方式。就好比,Tomcat是什麼?不就是爲了實現一種client與服務器之間的交互的一種產品嘛。。因此,不須要死記概念,本身理解就好。
    #ActiveMQ的安裝
    ##環境:Windows
    步驟:
    (1)登陸到ActiveMQ的官網,下載安裝包。http://activemq.apache.org/activemq-5154-release.html
    (2)下載Zip文件
    這裏寫圖片描述
    (3)解壓Zip文件,目錄以下
    這裏寫圖片描述
    (4)啓動ActiveMQ服務(注意:要右鍵以管理員身份進行運行)
    這裏寫圖片描述
    注意:有兩種方式,第一種就是相似tomcat啓動,那麼啓動圖會一直顯示。
    而第二種的話,就是把這個ActiveMQ註冊到服務列表中,這樣更方便咱們進行操做。(推薦使用這種)
    (5)登陸,驗證是否啓動成功
    這裏寫圖片描述
    (6)進入管理頁面
    這裏寫圖片描述
    OK,進入以後就能夠看咱們的管理頁面啦。。。是否是很簡單呢?
    ##環境:Linux
    步驟:(多餘的我就很少說了。。。請看windows的步驟)
    (1)一樣須要下載對應的文件。後綴爲tar.gz的這樣的。其實能夠直接經過下面的這個命令下載,快速一點,省得要移動到Linux(注意:若是是經過ssh鏈接的方式的話)。

wget https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.4/apache-activemq-5.15.4-bin.tar.gz
  • 1

(2)而後解壓下載的文件
(3)一樣進入相對應的目錄,運行

./activemq start
  • 1

(4)而後再訪問相同的地址就能夠看到啦。(具體看windows安裝步驟)
#ActiveMQ的使用(基於Maven)
首先要再回頭看看JMS中的一些關鍵接口。

  • ConnectionFactory:用於建立鏈接到消息中間件的鏈接工廠。
  • Connection:表明了應用程序和服務之間的鏈接通路。
  • Destination:指消息發佈的地點,包括隊列模式和主體模式。
  • Session:表示一個單線程的上下文,用於發送和接受消息。
  • MessageConsumer:由會話建立,用於接受發送到目的的消息。
  • MessageProducer:由會話建立,用於發送消息。
  • Message:是在消費者和生產者之間傳遞的對象,消息頭,一組消息屬性,和一個消息體。
    這裏寫圖片描述
    環境:IDEA
    步驟:
  1. 使用IDEA建立一個Maven項目,最簡單的骨架便可(quick)
  2. 導入ActiveMq的依賴
<!--添加activemq的依賴-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.9.0</version>
    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

###情形一:隊列模型的消息
3. 編寫生產者代碼(使用隊列模型的消息)

package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ Author     :scw
 * @ Date       :Created in 上午 11:06 2018/7/14 0014
 * @ Description:用於消息的建立類
 * @ Modified By:
 * @Version: $version$
 */
public class MessageProducer {
    //定義ActivMQ的鏈接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定義發送消息的隊列名稱
    private static final String QUEUE_NAME = "MyMessage";

    public static void main(String[] args) throws JMSException {
        //建立鏈接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
       //建立鏈接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開鏈接
        connection.start();
        //建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立隊列目標
        Destination destination = session.createQueue(QUEUE_NAME);
        //建立一個生產者
        javax.jms.MessageProducer producer = session.createProducer(destination);
        //建立模擬100個消息
        for (int i = 1 ; i <= 100 ; i++){
            TextMessage message = session.createTextMessage("我發送message:" + i);
            //發送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("我如今發的消息是:" + message.getText());
        }
        //關閉鏈接
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  1. 查看是否消息產生成功
    這裏寫圖片描述
  2. 編寫消費者代碼(消費隊列模型的消息)
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ Author     :scw
 * @ Date       :Created in 上午 11:30 2018/7/14 0014
 * @ Description:消息消費者
 * @ Modified By:
 * @Version: $version$
 */
public class MessageConsumer {
    //定義ActivMQ的鏈接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定義發送消息的隊列名稱
    private static final String QUEUE_NAME = "MyMessage";
    public static void main(String[] args) throws JMSException {
        //建立鏈接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //建立鏈接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開鏈接
        connection.start();
        //建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立隊列目標
        Destination destination = session.createQueue(QUEUE_NAME);
        //建立消費者
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        //建立消費的監聽
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("獲取消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  1. 查看是否進行了消費
    這裏寫圖片描述
    **備註:**我上面進行的是隊列模式的消息,並且進行的都是單個消費者,那若是我換成同時有兩個消費者消費生產者的消息會怎麼樣呢?(咱們只須要運行兩個消費者就能夠啦。固然,要保證生產者是產生了消息的哦~~~~不然,拿什麼消費呢~)
    一個生產者,兩個消費者的狀況以下:
    切記:先運行兩個消費者,而後再運行生產者代碼:
    結果以下:
    這裏寫圖片描述
    這裏寫圖片描述

    其實,這就是解釋了,我以前說的,隊列模式的消息,是隻會被一個消費者所使用的,而不會被共享,這也就是和主題模型的差異哦~~~哈哈
    ###情形二:主題模型的消息
    前面的步驟都同樣,只是生產者和消費者的代碼有點區別:

  2. 編寫生產者(這個和隊列模型其實很像,稍微修改就能夠)

package com.hnu.scw.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ Author     :scw
 * @ Date       :Created in 上午 11:48 2018/7/14 0014
 * @ Description:${description}
 * @ Modified By:
 * @Version: $version$
 */
public class MessageTopicProducer {

    //定義ActivMQ的鏈接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定義發送消息的主題名稱
    private static final String TOPIC_NAME = "MyTopicMessage";

    public static void main(String[] args) throws JMSException {
        //建立鏈接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //建立鏈接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開鏈接
        connection.start();
        //建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立隊列目標
        Destination destination = session.createTopic(TOPIC_NAME);
        //建立一個生產者
        javax.jms.MessageProducer producer = session.createProducer(destination);
        //建立模擬100個消息
        for (int i = 1; i <= 100; i++) {
            TextMessage message = session.createTextMessage("當前message是(主題模型):" + i);
            //發送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("我如今發的消息是:" + message.getText());
        }
        //關閉鏈接
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  1. 查看生產者的消息
    這裏寫圖片描述
  2. 編寫消費者
package com.hnu.scw.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ Author     :scw
 * @ Date       :Created in 上午 11:50 2018/7/14 0014
 * @ Description:${description}
 * @ Modified By:
 * @Version: $version$
 */
public class MessageTopicConsumer {
    //定義ActivMQ的鏈接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定義發送消息的隊列名稱
    private static final String TOPIC_NAME = "MyTopicMessage";
    public static void main(String[] args) throws JMSException {
        //建立鏈接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //建立鏈接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開鏈接
        connection.start();
        //建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立隊列目標
        Destination destination = session.createTopic(TOPIC_NAME);
        //建立消費者
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        //建立消費的監聽
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("獲取消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  1. 查看是否消費成功
    然而,咱們運行消費者代碼,發現怎麼沒有消息消費呢?????????
    其實,這就是主題模型的一個特色,若是消費者是在生產者產生消息以後來的,那麼是不會對以前的消息進行消費的哦。。。如今知道它們的區別在哪了吧。
    若是,如今是兩個消費者和一個生產者的主題模型又是怎麼的結果呢?
    這裏寫圖片描述
    這裏寫圖片描述
    哎喲。。。。這種狀況消費者都各自消費了全部的生產者的消息耶。。。。。這就是共享性消息的主題模式,這就是和隊列模型的區別,,,你們好好的對比哦~~
    #ActiveMQ使用(基於Spring)
    步驟:
  2. 建立一個Maven項目(基於最簡單的quick骨架便可)
  3. 導入Spring和ActiveMQ的相關依賴
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.hnu.scw</groupId>
  <artifactId>activemq</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>activemq</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <spring.version>4.2.5.RELEASE</spring.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!--添加activemq的依賴-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.9.0</version>
    </dependency>

    <!--spring整合activemq所須要的依賴-->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-test</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-core</artifactId>
      <version>5.7.0</version>
      <exclusions>
        <exclusion>
          <artifactId>spring-context</artifactId>
          <groupId>org.springframework</groupId>
        </exclusion>
      </exclusions>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
        <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.7.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.20.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  1. 編寫生產者的配置文件.xml,取名爲producer.xml
<?xml version="1.0" encoding="UTF-8" ?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd ">

    <context:annotation-config />

    <!--Activemq的鏈接工廠-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />
    </bean>
   <!--spring jms爲咱們提供的鏈接池 獲取一個鏈接工廠-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

    <!-- 消息目的地  點對點的模式-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="SpringActiveMQMsg"/>
    </bean>
    <!-- jms模板  用於進行消息發送-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
</beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  1. 編寫生產者的接口
package com.hnu.scw.spring;

/**
 * @ Author     :scw
 * @ Date       :Created in 下午 12:19 2018/7/14 0014
 * @ Description:生產者的接口
 * @ Modified By:
 * @Version: $version$
 */
public interface ProduceService {
    void sendMessage(String msg);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  1. 編寫生產者的實現
package com.hnu.scw.spring;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.annotation.Resource;
import javax.jms.*;

/**
 * @ Author     :scw
 * @ Date       :Created in 下午 2:21 2018/7/15 0015
 * @ Description:生產者的實現類
 * @ Modified By:
 * @Version: $version$
 */

public class ProduceServiceImpl implements ProduceService {
    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name = "queueDestination")
    private Destination destination;

    /**
     * 發送消息
     * @param msg
     */
    @Override
    public void sendMessage(final String msg) {
        jmsTemplate.send(destination , new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(msg);
                return textMessage;
            }
        });
        System.out.println("如今發送的消息爲: " + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  1. 將生產者的類添加到上述的配置文件中
<!--注入咱們的生產者-->
    <bean class="com.hnu.scw.spring.ProduceServiceImpl"/>
  • 1
  • 2
  1. 編寫生產者的測試類
package com.hnu.scw.spring;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * @ Author     :scw
 * @ Date       :Created in 下午 2:27 2018/7/15 0015
 * @ Description:生產者的測試
 * @ Modified By:
 * @Version: $version$
 */
public class ProducerTest {
    public static void main(String[] args){
        ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("producer.xml");
        ProduceService bean = classPathXmlApplicationContext.getBean(ProduceService.class);
        //進行發送消息
        for (int i = 0; i < 100 ; i++) {
            bean.sendMessage("test" + i);
        }
        //當消息發送完後,關閉容器
        classPathXmlApplicationContext.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  1. 運行測試類,查看生產者是否產生消息成功
    這裏寫圖片描述
    經過上述的界面,就能夠看到本身配置的隊列模式的消息產生成功。
  2. 編寫消費者的消息監聽類
  3. 編寫消費者的配置文件,命名爲consumer.xml
<?xml version="1.0" encoding="UTF-8" ?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd ">

    <context:annotation-config />

    <!--Activemq的鏈接工廠-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />
    </bean>
    <!--spring jms爲咱們提供的鏈接池 獲取一個鏈接工廠-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

    <!-- 消息目的地  點對點的模式-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="SpringActiveMQMsg"/>
    </bean>

    <!-- 配置消息監聽器-->
    <bean id="consumerMessageListener" class="com.hnu.scw.spring.ComsumerMessageListener"/>
    <!--配置消息容器-->
    <bean id ="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!--配置鏈接工廠-->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!--配置監聽的隊列-->
        <property name="destination" ref="queueDestination"/>
        <!--配置消息監聽器-->
        <property name="messageListener" ref="consumerMessageListener"/>
    </bean>
</beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  1. 消息消費者ComsumerMessageListener類代碼
package com.hnu.scw.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * @ Author     :scw
 * @ Date       :Created in 下午 3:06 2018/7/15 0015
 * @ Description:消息的監聽者,用於處理消息
 * @ Modified By:
 * @Version: $version$
 */
public class ComsumerMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("接受到消息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  1. 編寫測試文件,測試消費者消費消息是否成功
package com.hnu.scw.spring;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * @ Author     :scw
 * @ Date       :Created in 下午 3:13 2018/7/15 0015
 * @ Description:消費者的測試
 * @ Modified By:
 * @Version: $version$
 */
public class ConsumerTest {
    public static void main(String[] args){
        //啓動消費者
        ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("consumer.xml");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  1. 查看ActiveMQ網站具體消息狀況
    這裏寫圖片描述
    這裏寫圖片描述
  2. ActiveMQ的隊列模型就大功告成啦。。。。。。so easy!!!
    備註:上面都是進行的ActiveMQ的隊列模型的配置,那麼咱們若是想進行主題模型的又是如何進行操做呢?其實也很簡單,只須要修改生產者的xml文件裏面的隊列便可。好比以下代碼:
<!-- 消息目的地  (主題模式)-->
    <!--<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQTopic">
        &lt;!&ndash;配置隊列模型的消息名稱&ndash;&gt;
        <constructor-arg value="SpringActiveMQMsgTopic"/>
    </bean>-->
  • 1
  • 2
  • 3
  • 4
  • 5

將上面的代碼替換以前的就能夠了。。。
總結:總的來講,基於Spring來使用消息隊列仍是很是方便的,這比咱們正常進行JMS規範操做要簡單不少,畢竟不少對象都是經過Spring的IOC進行容器管理了,因此,值得推薦使用哦~~~
#ActiveMQ的集羣
###爲何要進行集羣呢?
緣由一:實現高可用:以排除單點故障所引發的服務終端。
緣由二:實現負載均衡:以提高效率爲更多的客戶進行服務。
###集羣的方式有哪些?
方式一:客戶端集羣:多個客戶端消費同一個隊列。
方式二:Broker clusters:多個Broker之間同步消息。(實現負載均衡)
這裏寫圖片描述
這個的實現原理主要是經過網絡鏈接器來進行。
網絡鏈接器:用於配置ActiveMQ服務器與服務器之間的網絡通信方式,用於服務器透析消息。主要分爲靜態鏈接和動態鏈接。
方式三:Master Slave :實現高可用。
這種方式的話,能夠聯想到Mysql的主從配置和Zookeeper的負載均衡的主競爭關係master。
咱們在實際的開發中,通常都是將方式二和方式三進行集成,從而實現高可用和負載均衡。下面的話,我也就這樣的配置思想來進行講解:(經過三臺服務器來模擬消息集羣的實現)
這裏寫圖片描述
其中的NodeB和NodeC就是一張Master/slave的關係。均可以成爲主服務器。(只要它們某一個宕機,那麼就會其他的一臺就進行繼續服務)
###搭建步驟(基於Windows環境,而Linux環境也是同樣的操做)
三臺服務器的大致功能和描述:
這裏寫圖片描述
因爲本身沒有三臺服務器,因此就用本身的一臺電腦來模擬三臺消息服務器,其實這個就是假設有三個不一樣ActiveMQ消息服務器了。

  1. 複製三個ActiveMQ的服務配置到一個公共目錄
    這裏寫圖片描述
  2. 修改activeMQA的配置文件
    這裏寫圖片描述
    只須要在activemq.xml添加以下內容:
<networkConnectors>
		<networkConnector name="local_network" uri ="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" />
	</networkConnectors>
  • 1
  • 2
  • 3
  1. 修改ActiveMQB的配置文件
    (1)首先在activemq,xml中添加以下內容:
<!--修改服務端口-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
	   <networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" />
	</networkConnectors>
<!--並修改下面這個標籤的內容 , 做爲B和C的共享文件,目錄就是本身以前建立的一個文件(能夠回看上面的整個結構)-->
<persistenceAdapter>
            <kahaDB directory="D:\Download\MQJiQun\shareDB"/>
        </persistenceAdapter>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

(2)修改jetty.xml內容,修改服務器的服務端口

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
             <!-- the default port number for the web console -->
        <property name="host" value="0.0.0.0"/>
        <property name="port" value="8162"/>
    </bean>
  • 1
  • 2
  • 3
  • 4
  • 5
  1. 修改ActiveMQC的配置文件(其實相似和B同樣,只是服務端口不同)
    (1)修改activemq.xml中的內容
<!--修改服務端口-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
	   <networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" />
	</networkConnectors>
<!--並修改下面這個標籤的內容 , 做爲B和C的共享文件,目錄就是本身以前建立的一個文件(能夠回看上面的整個結構)-->
<persistenceAdapter>
            <kahaDB directory="D:\Download\MQJiQun\shareDB"/>
        </persistenceAdapter>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

(2)修改jetty.xml中的內容

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
             <!-- the default port number for the web console -->
        <property name="host" value="0.0.0.0"/>
        <property name="port" value="8163"/>
    </bean>
  • 1
  • 2
  • 3
  • 4
  • 5
  1. 集羣搭建完成~~~~

集羣測試(基於IDEA編輯器+Maven)

步驟:
(1)建立Maven項目
(2)導入依賴

<!--添加activemq的依賴-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.9.0</version>
    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(3)編寫生產者代碼

package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ Author     :scw
 * @ Date       :Created in 上午 11:06 2018/7/14 0014
 * @ Description:用於消息的建立類
 * @ Modified By:
 * @Version: $version$
 */
public class MessageProducer {
    //經過集羣的方式進行消息服務器的管理(failover就是進行動態轉移,當某個服務器宕機,
    // 那麼就進行其餘的服務器選擇,randomize表示隨機選擇)
    private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
    //定義發送消息的隊列名稱
    private static final String QUEUE_NAME = "MyMessage";

    public static void main(String[] args) throws JMSException {
        //建立鏈接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
       //建立鏈接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開鏈接
        connection.start();
        //建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立隊列目標
        Destination destination = session.createQueue(QUEUE_NAME);
        //建立一個生產者
        javax.jms.MessageProducer producer = session.createProducer(destination);
        //建立模擬100個消息
        for (int i = 1 ; i <= 100 ; i++){
            TextMessage message = session.createTextMessage("當前message是:" + i);
            //發送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("我如今發的消息是:" + message.getText());
        }
        //關閉鏈接
        connection.close();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

(4)編寫消費者代碼

package com.hnu.scw.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ Author     :scw
 * @ Date       :Created in 上午 11:30 2018/7/14 0014
 * @ Description:消息消費者
 * @ Modified By:
 * @Version: $version$
 */
public class MessageConsumer {
    //經過集羣的方式進行消息服務器的管理(failover就是進行動態轉移,當某個服務器宕機,
    // 那麼就進行其餘的服務器選擇,randomize表示隨機選擇)
    private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
    //定義發送消息的隊列名稱
    private static final String QUEUE_NAME = "MyMessage";

    public static void main(String[] args) throws JMSException {
        //建立鏈接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //建立鏈接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開鏈接
        connection.start();
        //建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立隊列目標
        Destination destination = session.createQueue(QUEUE_NAME);
        //建立消費者
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        //建立消費的監聽
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("獲取消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

(5)進行查看各自的服務器的消息隊列的狀況。

  1. 首先,是要確保三個ActiveMQ服務器都進行打開。分析:當三個都服務都運行以後,咱們從瀏覽器運行各自的地址,會發現:
    好比:我這裏的三個服務的地址分別以下:
  • http://127.0.0.1:8161/
  • http://127.0.0.1:8162/
  • http://127.0.0.1:8163/
    ###重點
    爲何前面兩個均可以訪問,而第三個不能夠呢?(一樣也是按照個人這樣的服務器打開方式哦。先打開的服務器A,接着B,最後C)可是,運行的時候,提示都成功了呀。。爲何爲何???
    分析:其實很簡單,我說過B和C是一種master/slave的方式,當B運行以後就得到了master的權限,那麼C服務是能夠看到是一種監聽的狀態,只有當B宕機以後,纔有可能獲取master的資源權限,因此,這時候C的地址固然就沒法訪問啦。這就是負載均衡的一種主/從服務的結構。固然,你能夠試着先打開C,再打開B,這時候效果就反過來了。歡迎嘗試哦~~~
  1. 再運行MessageProducer的類,用於產生消息。這時候,你們能夠去查看每一個服務器的地址,來觀察消息的產生狀況。個人以下:
    這裏寫圖片描述
    個人消息是產生在服務器B的裏面啦。。。。。。
  2. 再運行MessageConsumer的類,用於消費消息。這時候,一樣能夠去查看每一個服務器的地址中的消息隊列的狀況,來觀察消息的消費狀況。個人以下:
    這裏寫圖片描述
  3. 若是,咱們在生產者產生了消息以後,服務器B忽然宕機了怎麼辦怎麼辦??
    分析:其實,這時候服務器C就同樣有消息保存進行同步了。。是否是這樣就是一種高可用的架構了呢????你們,能夠試試哦。。把B服務器關掉,再去訪問服務器C的地址,就發現以下的結果。
    這裏寫圖片描述
    這時候服務器C就做爲了master,因此,相似zookeeper就是這樣的一種方式的哦。~
    ###總結
    好了,對於集羣方面的簡單使用就到這裏了。其實已經能夠根據這個進行擴展了,因此,小夥伴要好好理解這裏面的過程和做用,這樣纔可以學以至用。。。

#其餘的消息中間件
其實,相似ActiveMQ這樣的消息中間件,用得比較多的還有就是RabbitMQ和Kafka。它們三者各自有各自的優點。你們能夠百度進行了解,我就不進行多說了。後面我會一樣把這兩種消息中間件的使用進行詳細的講解,歡迎你們的關注哦~總的來講,只有適合的場景對應的消息中間件才能發揮最大的做用,沒有一種是隻有好處而沒有壞處的~
#總結

  • 主要是對消息中間件的基礎知識進行講解。
  • 主要講解ActiveMQ的使用
  • 主要講解了關於ActiveMQ的集羣的搭建
  • 稍微提到了相似ActiveMQ消息中間件的其餘中間件
  • 我所講述的內容,夠你們進行入門了,若是要進行深刻的瞭解仍是須要慢慢的去熟悉和學習的,並且消息中間件是很是重要的一個技術,但願你們去好好的瞭解。
  • 最後,感謝各位的閱讀哦~~~~
相關文章
相關標籤/搜索