深刻淺出 JMS(二) - ActiveMQ 入門指南

深刻淺出 JMS(二) - ActiveMQ 入門指南

上篇博文深刻淺出 JMS(一) – JMS 基本概念,咱們介紹了消息通訊的規範JMS,這篇博文介紹一款開源的 JMS 具體實現—— ActiveMQ。ActiveMQ 是一個易於使用的消息中間件。html

1、消息中間件和 ActiveMQ

(1) 消息中間件(MOM:Message Orient middleware)

咱們簡單的介紹一下消息中間件,對它有一個基本認識就好,消息中間件有不少的用途和優勢:java

  1. 將數據從一個應用程序傳送到另外一個應用程序,或者從軟件的一個模塊傳送到另一個模塊;
  2. 負責創建網絡通訊的通道,進行數據的可靠傳送;
  3. 保證數據不重發,不丟失;
  4. 可以實現跨平臺操做,可以爲不一樣操做系統上的軟件集成技工數據傳送服務。

(2) ActiveMQ

首先簡單的介紹一下 MQ,MQ 英文名 MessageQueue,中文名也就是你們用的消息隊列,幹嗎用的呢,說白了就是一個消息的接受和轉發的容器,可用於消息推送。web

下面進入咱們今天的主題,爲你們介紹 ActiveMQ。apache

Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server.
Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4.

ActiveMQ 是由 Apache 出品的,一款最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持 JMS1.1 和 J2EE 1.4 規範的 JMS Provider 實現,它很是快速,支持多種語言的客戶端和協議,並且能夠很是容易的嵌入到企業的應用環境中,並有許多高級功能。後端

下面咱們下載一個版本,玩一玩。瀏覽器

2、運行 ActiveMQ 服務

(1) 下載,解壓縮

ActiveMQ 下載網站:http://activemq.apache.org/activemq-5153-release.html安全

你們如今好以後,將 apache-activemq-5.15.3-bin.zip 解壓縮,咱們能夠看到它的總體目錄結構:服務器

圖2.1 activemq目錄結構

從它的目錄來講,仍是很簡單的:網絡

  1. bin:存放的是腳本文件
  2. conf:存放的是基本配置文件
  3. data:存放的是日誌文件
  4. docs:存放的是說明文檔
  5. examples:存放的是簡單的實例
  6. lib:存放的是 activemq 所需 jar 包
  7. webapps:用於存放項目的目錄

(2) 啓動 ActiveMQ

咱們瞭解 activemq 的基本目錄,下面咱們運行一下 activemq 服務,雙擊 bin 目錄下的 bin/win64/activemq.bat 腳本文件,就能夠看下圖的效果。session

圖2.2 activemq啓動

從上圖咱們能夠看到 activemq 的存放地址,以及瀏覽器要訪問的地址。

(3) 測試

ActiveMQ 默認使用的 TCP 鏈接端口是 61616, 經過查看該端口的信息能夠測試 ActiveMQ 是否成功啓動

netstat -an | find "61616" 

TCP     0.0.0.0:61616     0.0.0.0:0       LISTENING

(4) 監控

ActiveMQ 默認啓動時,啓動了內置的 jetty 服務器,提供一個用於監控 ActiveMQ 的 admin 應用。地址:http://127.0.0.1:8161/admin/

用戶名和密碼都是 admin

圖2.4 activemq監控

3、ActiveMQ 特性列表

  1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 徹底支持 JMS1.1 和 J2EE 1.4 規範 (持久化,XA消息,事務)
  3. 對 Spring 的支持,ActiveMQ 能夠很容易內嵌到使用 Spring 的系統裏面去,並且也支持 Spring2.0 的特性
  4. 經過了常見 J2EE 服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過 JCA 1.5 resource adaptors 的配置,可讓 ActiveMQ 能夠自動的部署到任何兼容 J2EE 1.4 商業服務器上
  5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持經過 JDB C和 journal 提供高速的消息持久化
  7. 從設計上保證了高性能的集羣,客戶端-服務器,點對點
  8. 支持 Ajax
  9. 支持與 Axis 的整合
  10. 能夠很容易得調用內嵌 JMS provider,進行測試

4、什麼狀況下使用 ActiveMQ

  1. 多個項目之間集成

    • 跨平臺
    • 多語言
    • 多項目
  2. 下降系統間模塊的耦合度,解耦

    • 軟件擴展性
  3. 系統先後端隔離

    • 先後端隔離,屏蔽高安全區

5、ActiveMQ 快速入門

們首先寫一個簡單的 Hello World 示例,讓你們感覺下 Activemq,咱們須要實現接受者和發送者兩部分代碼的編寫。

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.3</version>
</dependency>

 (1) 生產者

public class Producer {

    public static void main(String[] args) throws JMSException {
        //1. 建立 ConnectionFactory 鏈接工廠
        ConnectionFactory factory = new ActiveMQConnectionFactory( // (1)
                //ActiveMQConnectionFactory.DEFAULT_USER,
                //ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "admin", "password",
                "tcp://localhost:61616/"
        );

        //2. 建立 connection,並啓動鏈接
        Connection connection = factory.createConnection();  // (2)
        connection.start();

        //3. Session 是一個發送或接收消息的線程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  // (3)

        //4. 指定生產消息目標禾消費消息來源的 Destination 對象
        Destination dest = session.createQueue("queue1");  // (4)

        //5. 建立生產者
        MessageProducer producer = session.createProducer(dest);  // (5)

        //6. 指定簽收模式
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  // (6)

        //7. 建立消息
        for (int i = 0; i < 10; i++) {  // (7)
            TextMessage message = session.createTextMessage("aaaaaaaaaaaaaaa");
            producer.send(message);
        }

        if (connection != null) {
            connection.close();
        }
    }
}
  1. 建立 ConnectionFactory 實例,ActiveMQConnectionFactory 構造函數傳入三個參數,分別是用戶名,密碼,消息地址,tcp 端口能夠在 conf/activemq.xml 中配製。

  2. 建立 connection,並啓動鏈接,Connection 默認是關閉的。注意:使用結束後要關閉 connection.close()

  3. 建立 session 會話,一個 connection 能夠建立多個 session,Session 是一個發送或接收消息的線程。

    • 參數1:是否開啓事務,若是開啓事務,則在必須提交 session

      for (int i = 0; i < 10; i++) {  // (7)
          TextMessage message = session.createTextMessage("aaaaaaaaaaaaaaa");
          producer.send(message);
      }
      session.commit();
    • 參數2:簽收模式,有 Session.AUTO_ACKNOWLEDGE(自動)、Session.CLIENT_ACKNOWLEDGE(手動 經常使用)、Session.DUPS_OK_ACKNOWLEDGE(可能重複簽收),如若選擇 Session.CLIENT_ACKNOWLEDGE,則必須在消費端確認,不然 ActiveMQ 不認爲消息已經消費。生產中通常使用 Session.CLIENT_ACKNOWLEDGE 簽收,不要相信自動簽收方式。

      # 客戶端處理
      TextMessage message = (TextMessage) consumer.receive();
      message.acknowledge();
  4. 經過 session 建立 Destination 對象,指定生產消息目標禾消費消息來源的對象。在 PTP 模式中 Destination 被稱做 Queue 即隊列;在 Pub/sub 模式 Destination 被稱做 Topic 即主題,在程序中可使用多個 Queue 和 Top。

  5. 建立消息的發送和接收對象(生產者和消貴者) MessageProducer/MessageConsumer

  6. 指定簽收模式。使用 MessageProducer 的 setDeliveryMode 方法爲其設置持久化特性和非持久化特性(DeliveryMode)

  7. 發送消息。使用 JMS 規範的 TextMessage 形式建立數據(經過 session 對象),而且 MessageProducer 的 send 方法發送數據。同理客戶端使用 receive 方法進行接收數據。最後不要忘記關閉 Connection 鏈接。

    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)

    • destination 能夠指定將不一樣的消息發送到不一樣的 destination,但消費端就必須指定對應的 destination

    • deliveryMode 是否開啓持久化,默認爲持久性,DeliveryMode.NON_PERSISTENTDeliveryMode.PERSISTENT

    • priority 優先級 0-9,默認爲 4,優先級越高越先消費,機率

    • timeToLive ActiveMQ 中消息保留的時間,單位秒,默認永久保存

(2) 消費者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616/"
        );

        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

        Destination dest = session.createQueue("queue1"); // (1)

        MessageConsumer consumer = session.createConsumer(dest);

        while (true) {
            TextMessage message = (TextMessage) consumer.receive();
            //message.acknowledge(); 
            if (message == null)
                break;
            System.out.println(message.getText());
        }

        if (connection != null) {
            connection.close();
        }
    }
}
  1. 消費端與生產者大同小異,注意對應的參數最好設定爲一致。必須從對應的 Destination 取出數據,不然沒法取到數據。
相關文章
相關標籤/搜索