架構設計:系統間通訊(21)——ActiveMQ的安裝與使用

一、前言

以前咱們經過兩篇文章(架構設計:系統間通訊(19)——MQ:消息協議(上)架構設計:系統間通訊(20)——MQ:消息協議(下))從理論層面上爲你們介紹了消息協議的基本定義,並花了較大篇幅向讀者介紹了三種典型的消息協議:XMPP協議、Stomp協議和AMQP協議。本小節開始,咱們基於以前的知識點講解這些協議在具體的「消息隊列中間件」中是如何被咱們操做的。因爲本人在實際工做中常用ActiveMQ和RabbitMQ,因此就選取這兩個「消息隊列中間件」進行講解。若是讀者能夠補充其餘「消息隊列中間件」的使用,那固然是再好不過了。html

二、ActiveMQ的安裝和使用

ActiveMQ是Apache軟件基金會的開源產品,支持AMQP協議、MQTT協議(和XMPP協議做用相似)、Openwire協議和Stomp協議等多種消息協議。而且ActiveMQ完整支持JMS API接口規範(固然Apache也提供多種其餘語言的客戶端,例如:C、C++、C#、Ruby、Perl)。java

2-一、ActiveMQ的安裝

在本文發佈之時,ActiveMQ最新的版本號是5.13.2(版本號升級很快,不過並不推薦使用最新的版本)。由ActiveMQ的安裝是很簡單,因此這個過程並不值得咱們花很大篇幅進行討論。具體的過程就是:下載->解壓->配置環境變量->運行:linux

  • 下載軟件

您能夠Apache ActiveMQ的官網下載安裝包:https://activemq.apache.org/download-archives.html。這裏咱們示例在CentOS下的安裝過程,因此下載Linux下的壓縮包便可(http://www.apache.org/dyn/closer.cgi?path=/activemq/5.13.2/apache-activemq-5.13.2-bin.tar.gz)。web

  • 解壓安裝

將下載的安裝包放置在root用戶的home目錄內,解壓便可(固然您能夠根據本身的須要加壓到不一樣的文件路徑下)。以下所示:數據庫

[root@localhost ~]# tar -zxvf ./apache-activemq-5.13.2-bin.tar.gz

以上解壓使用的是root用戶,這是爲了演示方便。正式環境中仍是建議禁用root用戶,爲activeMQ的運行專門建立一個用戶和用戶組。apache

  • 配置環境變量(不是必須的)

若是您只是在測試環境使用Apache ActiveMQ,以便熟悉消息中間件自己的特性和使用方式。那麼您無需對解壓後的軟件進行任何配置,全部可運行的命令都在軟件安裝目錄的./bin目錄下。爲了使用方便,最好配置一下環境變量,以下所示(注意,根據您本身的軟件安裝位置,環境變量的設置是不同的,請不要盲目粘貼複製):瀏覽器

設置該次會話的環境變量:
[root@localhost ~]# export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;

永久設置環境變量:
[root@localhost ~]# echo "export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;" >> /etc/profile

在ActiveMQ Version 5.9+的版本中,Apache ActiveMQ 針對操做系統進行了更深刻的優化,因此您能夠看到./bin目錄下,有一個針對32位Linux運行命令的./linux-x86-32目錄,和針對64位Linux運行命令的./linux-x86-64目錄。請按照您本身的狀況進行環境變量設置和命令運行。ruby

  • 運行程序

如今您能夠在任何目錄,運行activemq命令了。注意activemq命令一共有6個參數(console | start | stop | restart | status | dump),啓動Apache ActiveMQ使用的命令是activemq start:性能優化

[root@localhost ~]# activemq start

若是啓動成功,就能夠在瀏覽器上訪問服務節點在8161端口的管理頁面了(例如http://localhost:8161):服務器

這裏寫圖片描述

點擊‘manage ActiveMQ broker’鏈接,能夠進入管理主界面(默認的用戶和密碼都是admin)。以上就是Apache ActiveMQ消息中間件最簡的安裝和運行方式。在後續的文章中,咱們會陸續討論ActiveMQ的集羣和高性能優化,那時會介紹對應的ActiveMQ的配置問題。

2-二、ActiveMQ的其餘命令參數

如同上文講到的,activemq命令除了start參數用於啓動activemq程序之外,還有另外5個參數可使用:console | stop | restart | status | dump。他們表明的使用意義是:

  • stop:中止當前ActiveMQ節點的運行。

  • restart:從新啓動當前ActiveMQ節點。

  • status:查看當前ActiveMQ節點的運行狀態。若是當前ActiveMQ節點沒有運行,那麼將返回「ActiveMQ Broker is not running」的提示信息。注意,status命令只能告訴開發人員當前節點時中止的仍是運行的,除此以外不能從status命令獲取更多的信息。例如,ActiveMQ爲何建立Queue失敗?當前ActiveMQ使用了多少內存?而要獲取這些信息,須要使用如下參數啓動ActiveMQ節點。

  • console:使用控制檯模式啓動ActiveMQ節點;在這種模式下,開發人員能夠調試、監控當前ActivieMQ節點的實時狀況,並獲取實時狀態。

  • dump:若是您採用console模式運行ActiveMQ,那麼就可使用dump參數,在console控制檯上獲取當前ActiveMQ節點的線程狀態快照。

2-三、在ActiveMQ中傳遞Stomp消息

好吧,既然咱們已經討論過如何安裝和運行ActiveMQ,也討論了Stomp協議的組織結構,爲何咱們不當即動手試一試操做ActiveMQ承載Stomp協議的消息呢?

下面咱們使用ActiveMQ提供的JAVA 客戶端(實際上就是ActiveMQ對JMS規範的實現),向ActiveMQ中的Queue(示例代碼中將這個Queue命名爲’test’)發送一條Stomp協議消息,而後再使用JAVA語言的客戶端,從ActiveMQ上接受這條消息:

  • 使用ActiveMQ的API發送Stomp協議消息:
package mq.test.stomp;

import java.net.Socket;
import java.util.Date;

import org.apache.activemq.transport.stomp.StompConnection;

// 消息生產者
public class TestProducer {
    public static void main(String[] args) {
        try {
            // 創建Stomp協議的鏈接
            StompConnection con = new StompConnection();
            Socket so = new Socket("192.168.61.138", 61613);
            con.open(so);
            // 注意,協議版本能夠是1.2,也能夠是1.1
            con.setVersion("1.2");
            // 用戶名和密碼,這個沒必要多說了
            con.connect("admin", "admin");

            // 如下發送一條信息(您也可使用「事務」方式)
            con.send("/test", "234543" + new Date().getTime());
        } catch(Exception e) {
            e.printStackTrace(System.out);
        }
    }
}
  • 使用ActiveMQ的API接收Stomp協議消息:
package mq.test.stomp;

import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Map;

import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;

public class TestConsumer {
    public static void main(String[] args) throws Exception {
        // 創建鏈接
        StompConnection con = new StompConnection();
        Socket so = new Socket("192.168.61.138", 61613);
        con.open(so);
        con.setVersion("1.2");
        con.connect("admin", "admin");

        String ack = "client";
        con.subscribe("/test", "client");
        // 接受消息(使用循環進行)
        for(;;) {
            StompFrame frame = null;
            try {
                // 注意,若是沒有接收到消息,
                // 這個消費者線程會停在這裏,直到本次等待超時
                frame = con.receive();
            } catch(SocketTimeoutException e) {
                continue;
            }

            // 打印本次接收到的消息
            System.out.println("frame.getAction() = " + frame.getAction());
            Map<String, String> headers = frame.getHeaders();
            String meesage_id = headers.get("message-id");
            System.out.println("frame.getBody() = " + frame.getBody());
            System.out.println("frame.getCommandId() = " + frame.getCommandId());

            // 在ack是client標記的狀況下,確認消息
            if("client".equals(ack)) {
                con.ack(meesage_id);
            }
        }
    }
}

以上分別是使用Activie提供的Stomp協議的消息生產端和Stomp協議的消息消費端的代碼(若是您不清楚Stomp協議的細節,能夠參考我另外一篇文章:《架構設計:系統間通訊(19)——MQ:消息協議(上)》)。請注意在代碼片斷中,並無出現任何一個帶有jms名稱的包或者類——這是由於ActiveMQ爲Stomp協議提供的JAVA API在內部進行了JMS規範的封裝。

您能夠查看activemq-stomp中關於協議轉換部分的源代碼:org.apache.activemq.transport.stomp.JmsFrameTranslator和其父級接口:org.apache.activemq.transport.stomp.FrameTranslator來驗證這件事情(關於ActiveMQ對JMS規範的實現設計,若是後續有時間再回頭進行講解)。

如下是Stomp協議的消費者端的運行效果(在生產者端已經向ActiveMQ插入了一條消息以後):

frame.getAction() = MESSAGE
frame.getBody() = 2345431458460073204
frame.getCommandId() = 0

注意,因爲消息體中插入了一個時間戳,因此您複製粘貼代碼後運行效果並不會和個人演示程序徹底一致。

2-四、ActiveMQ中的Queue和Topics

若是您細心的話,在ActiveMQ提供的管理頁面上已經看到有兩個功能頁面:Queue和Topic。Queue和Topic是JMS爲開發人員提供的兩種不一樣工做機制的消息隊列。 在ActiveMQ官方的解釋是:

  • Topics

In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.

中文的能夠譯作:JMS-Topic 隊列基於「訂閱-發佈」模式,當操做者發佈一條消息後,全部對這條消息感興趣的訂閱者均可以收到它——也就是說這條消息會被拷貝成多份,進行分發。只有當前「活動的」訂閱者可以收到消息(換句話說,若是當前JMS-Topic隊列中沒有訂閱者,這條消息將被丟棄)。

  • Queue

A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.

So Queues implement a reliable load balancer in JMS.

中文的能夠譯作:JMS-Queue是一種「負載均衡模式」的實現。一個消息能且只能被一個消費者接受。若是當前JMS-Queue中沒有任何的消費者,那麼這條消息將會被Queue存儲起來(實際應用中能夠存儲在磁盤上,也能夠存儲在數據庫中,看軟件的配置),直到有一個消費者鏈接上。另外,若是消費者在接受到消息後,在他斷開與JMS-Queue鏈接以前,沒有發送ack信息(能夠是客戶端手動發送,也能夠是自動發送),那麼這條消息將被髮送給其餘消費者。

如下表格摘自互聯網上的資料,基本上把Queue和Topic這兩種隊列的不一樣特性說清楚了:

比較項目 Topic 模式隊列 Queue 模式隊列
工做模式 「訂閱-發佈」模式,若是當前沒有訂閱者,消息將會被丟棄。若是有多個訂閱者,那麼這些訂閱者都會收到消息 「負載均衡」模式,若是當前沒有消費者,消息也不會丟棄;若是有多個消費者,那麼一條消息也只會發送給其中一個消費者,而且要求消費者ack信息。
有無狀態 無狀態 Queue數據默認會在mq服務器上以文件形式保存,好比Active MQ通常保存在$AMQ_HOME\data\kr-store\data下面。也能夠配置成DB存儲。
傳遞完整性 若是沒有訂閱者,消息會被丟棄 消息不會丟棄
處理效率 因爲消息要按照訂閱者的數量進行復制,因此處理性能會隨着訂閱者的增長而明顯下降,而且還要結合不一樣消息協議自身的性能差別 因爲一條消息只發送給一個消費者,因此就算消費者再多,性能也不會有明顯下降。固然不一樣消息協議的具體性能也是有差別的

2-五、JMS和協議間轉換

上文已經說到,JMS這套面向消息通訊的 JAVA API 是一個和廠商無關的規範。經過JMS,咱們能實現不一樣消息中間件廠商、不一樣協議間的轉換和交互。這一小節咱們就來討論一下這個問題。若是用一張圖來表示JMS在消息中間件中的做用話,那麼就能夠這麼來畫:

這裏寫圖片描述

首先您使用的MQ消息中間件須要實現了JMS規範;那麼經過JMS規範,開發人員能夠忽略各類消息協議的細節,只要消息在同一隊列中,就可以保證各類消息協議間實現互相轉換。下面咱們首先來看一個使用JMS API在ActiveMQ中操做openwire協議消息的簡單示例,而後再給出一個經過JMS,實現Stomp消息協議和Openwire消息協議間的互轉示例。

2-5-一、JMS操做

  • 如下代碼使用向某個Queue(命名爲test)中發送一條消息:
package jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/** * 測試使用JMS API鏈接ActiveMQ * @author yinwenjie */
public class JMSProducer {
    /** * 因爲是測試代碼,這裏忽略了異常處理。 * 正是代碼可不能這樣作 * @param args * @throws RuntimeException */
    public static void main (String[] args) throws Exception {
        // 定義JMS-ActiveMQ鏈接信息(默認爲Openwire協議)
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.61.138:61616");
        Session session = null;
        Destination sendQueue;
        Connection connection = null;

        //進行鏈接
        connection = connectionFactory.createQueueConnection();
        connection.start();

        //創建會話(設置一個帶有事務特性的會話)
        session = connection.createSession(true, Session.SESSION_TRANSACTED);
        //創建queue(固然若是有了就不會重複創建)
        sendQueue = session.createQueue("/test");
        //創建消息發送者對象
        MessageProducer sender = session.createProducer(sendQueue);
        TextMessage outMessage = session.createTextMessage();
        outMessage.setText("這是發送的消息內容");

        //發送(JMS是支持事務的)
        sender.send(outMessage);
        session.commit();

        //關閉
        sender.close();
        connection.close();
    }
}

當以上代碼運行到「start」的位置時,咱們能夠經過觀察ActiveMQ管理界面中connection列表中的鏈接信息,發現消息生產者已經創建了一個Openwire協議的鏈接:

這裏寫圖片描述

從而肯定咱們經過JMS API創建了一個openwire協議的通信鏈接。接着咱們使用如下代碼,創建一個基於openwire協議的「消費者」。注意:消息生產者和消息消費者,映射的隊列必須一致。(在示例代碼中,它們都映射名稱爲test的JMS-Queue)

  • 如下代碼使用JMS從某個Queue中接收消息:
package jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

import javax.jms.Session;


import org.apache.activemq.ActiveMQConnectionFactory;
/** * 測試使用JMS API鏈接ActiveMQ * @author yinwenjie */
public class JMSConsumer {
    /** * 因爲是測試代碼,這裏忽略了異常處理。 * 正是代碼可不能這樣作 * @param args * @throws RuntimeException */
    public static void main (String[] args) throws Exception {
        // 定義JMS-ActiveMQ鏈接信息
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.61.138:61616");
        Session session = null;
        Destination sendQueue;
        Connection connection = null;

        //進行鏈接
        connection = connectionFactory.createQueueConnection();
        connection.start();

        //創建會話(設置爲自動ack)
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創建Queue(固然若是有了就不會重複創建)
        sendQueue = session.createQueue("/test");
        //創建消息發送者對象
        MessageConsumer consumer = session.createConsumer(sendQueue);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message arg0) {
                // 接收到消息後,不須要再發送ack了。
                System.out.println("Message = " + arg0);
            }
        });

        synchronized (JMSConsumer.class) {
            JMSConsumer.class.wait();
        }

        //關閉
        consumer.close();
        connection.close();
    }
}

當以上「消費者」代碼運行到start的位置時,咱們經過ActiveMQ提供的管理界面能夠看到,基於Openwire協議的鏈接增長到了兩條:

這裏寫圖片描述

注意,您在運行以上測試代碼時,不用和個人運行順序一致。因爲Queue模式的隊列是要進行消息狀態保存的,因此不管您是先運行「消費者」端,仍是先運行「生產者」端,最後「消費者」都會收到一條消息。相似以下的效果:

Message = ActiveMQTextMessage {commandId = 6, responseRequired = false, messageId = ID:yinwenjie-240-60482-1458616972423-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:yinwenjie-240-60482-1458616972423-1:1:1:1, destination = queue:///test, transactionId = TX:ID:yinwenjie-240-60482-1458616972423-1:1:1, expiration = 0, timestamp = 1458617840154, arrival = 0, brokerInTime = 1458617840166, brokerOutTime = 1458617840187, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@66968df8, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 這是發送的消息內容}

2-5-二、協議間轉換

下面咱們將Openwire協議的消息經過JMS送入Queue隊列,而且讓基於Stomp協議的消費者接收到這條消息。爲了節約篇幅,基於Openwire協議的生產者的代碼請參考上一小節2-5-1中「生產者」的代碼片斷。這裏只列出Stomp消息的接受者代碼(實際上這段代碼在上文中也能夠找到):

  • Stomp協議的消息消費者(消息接收者):
package mq.test.stomp;

import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Map;

import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;

public class TestConsumer {
    public static void main(String[] args) throws Exception {
        // 創建鏈接(注意,Stomp協議的鏈接端口是61613)
        StompConnection con = new StompConnection();
        Socket so = new Socket("192.168.61.138", 61613);
        con.open(so);
        con.setVersion("1.2");
        con.connect("admin", "admin");

        String ack = "client";
        con.subscribe("/test", "client");
        // 接受消息(使用循環進行)
        for(;;) {
            StompFrame frame = null;
            try {
                // 注意,若是沒有接收到消息,
                // 這個消費者線程會停在這裏,直到本次等待超時
                frame = con.receive();
            } catch(SocketTimeoutException e) {
                continue;
            }

            // 打印本次接收到的消息
            System.out.println("frame.getAction() = " + frame.getAction());
            Map<String, String> headers = frame.getHeaders();
            String meesage_id = headers.get("message-id");
            System.out.println("frame.getBody() = " + frame.getBody());
            System.out.println("frame.getCommandId() = " + frame.getCommandId());

            // 在ack是client模式的狀況下,確認消息
            if("client".equals(ack)) {
                con.ack(meesage_id);
            }
        }
    }
}

當您同時運行Openwire消息發送者和Stomp消息接收者時,您能夠在ActiveMQ的管理界面看到這兩種協議的鏈接信息:

這裏寫圖片描述

如下是Stomp協議消費者接收到的消息內容(通過轉換的openwire協議消息):

frame.getAction() = MESSAGE
frame.getBody() = 這是發送的消息內容
frame.getCommandId() = 0

接下文

相關文章
相關標籤/搜索