消息中間件ActiveMQ使用詳解

消息中間件ActiveMQ使用詳解

1、消息中間件的介紹

介紹

消息隊列 是指利用 高效可靠消息傳遞機制 進行與平臺無關的 數據交流,並基於 數據通訊 來進行分佈式系統的集成。html

特色(做用)

  • 應用解耦
  • 異步通訊
  • 流量削峯
  • (海量)日誌處理
  • 消息通信
  • …...

應用場景

根據消息隊列的特色,能夠衍生出不少場景,或者說不少場景都能用到。下面舉幾個例子:java

1)異步通訊docker

​ 註冊時的短信、郵件通知,減小響應時間;apache

2)應用解耦編程

​ 信息發送者和消息接受者無需耦合,好比調用第三方;服務器

3)流量削峯微信

​ 例如秒殺系統;網絡

2、消息中間件的對比

1.ActiveMQ

官網:activemq.apache.org/session

簡介:架構

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。

特色:

  1. 支持來自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各類跨語言客戶端和協議

  2. 徹底支持JMS客戶端和Message Broker中的企業集成模式

  3. 支持許多高級功能,如消息組,虛擬目標,通配符和複合目標

  4. 徹底支持JMS 1.1和J2EE 1.4,支持瞬態,持久,事務和XA消息

  5. Spring支持,以便ActiveMQ能夠輕鬆嵌入到Spring應用程序中,並使用Spring的XML配置機制進行配置

  6. 專爲高性能集羣,客戶端 - 服務器,基於對等的通訊而設計

  7. CXF和Axis支持,以便ActiveMQ能夠輕鬆地放入這些Web服務堆棧中以提供可靠的消息傳遞

  8. 能夠用做內存JMS提供程序,很是適合單元測試JMS

  9. 支持可插拔傳輸協議,例如in-VM,TCP,SSL,NIO,UDP,多播,JGroups和JXTA傳輸

  10. 使用JDBC和高性能日誌支持很是快速的持久性

2.RabbitMQ

官網:www.rabbitmq.com/

簡介:

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。RabbitMQ輕巧且易於部署在雲端。 它支持多種消息傳遞協議。 RabbitMQ能夠部署在分佈式和聯合配置中,以知足高規模,高可用性需求。RabbitMQ可運行在許多操做系統和雲環境中,併爲大多數流行語言提供普遍的開發工具。(來自官網翻譯)

AMQP (Advanced MessageQueue):高級消息隊列協議。它是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。

RabbitMQ最初普遍應用於金融行業,根據官網描述,它具備以下特色:

特色:

  1. 異步消息傳遞:支持多種消息協議,消息隊列,傳送確認,靈活的路由到隊列,多種交換類型;
  2. 支持幾乎全部最受歡迎的編程語言:Java,C,C ++,C#,Ruby,Perl,Python,PHP等等;
  3. 能夠部署爲高可用性和吞吐量的集羣; 跨多個可用區域和區域進行聯合;
  4. 可插入的身份驗證,受權,支持TLS和LDAP。;
  5. 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面;
  6. 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。

3. Kafka

官網:kafka.apache.org/

簡介:

Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。 這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。 對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是經過Hadoop的並行加載機制來統一線上和離線的消息處理,也是爲了經過集羣來提供實時的消息。

Kafka它主要用於處理活躍的流式數據,所以Kafaka在大數據系統中使用較多。

特色:

  1. 同時爲發佈和訂閱提供高吞吐量。據瞭解,Kafka每秒能夠生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)。

  2. 可進行持久化操做。將消息持久化到磁盤,所以可用於批量消費,例如ETL,以及實時應用程序。經過將數據持久化到硬盤以及replication防止數據丟失。

  3. 分佈式系統,易於向外擴展。全部的producer、broker和consumer都會有多個,均爲分佈式的。無需停機便可擴展機器。

  4. 消息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。

  5. 支持online和offline的場景。

4. RocketMQ

官網:rocketmq.apache.org/

簡介:

RocketMQ是阿里開源的消息中間件,目前在Apache孵化,使用純Java開發,具備高吞吐量、高可用性、適合大規模分佈式系統應用的特色。RocketMQ思路起源於Kafka,但並非簡單的複製,它對消息的可靠傳輸及事務性作了優化,目前在阿里集團被普遍應用於交易、充值、流計算、消息推送、日誌流式處理、binglog分發等場景,支撐了阿里屢次雙十一活動。

特色:

  1. 支持發佈/訂閱(Pub/Sub)和點對點(P2P)消息模型
  2. 在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞
  3. 支持拉(pull)和推(push)兩種消息模式
  4. 單一隊列百萬消息的堆積能力
  5. 支持多種消息協議,如 JMS、MQTT 等
  6. 分佈式高可用的部署架構,知足至少一次消息傳遞語義
  7. 提供 docker 鏡像用於隔離測試和雲集羣部署
  8. 提供配置、指標和監控等功能豐富的 Dashboard

3、ActiveMQ的安裝

1.安裝步驟

activemq在各個系統下都有對應的安裝包。如下來演示Linux系統下安裝activemq。

進入apache-activemq-5.15.8/bin目錄,啓動activemq./activemq start

輸出以上信息,表示啓動成功。

2.安裝遇到的問題

在安裝過程當中,經過查看activemq的運行狀態,

顯示以上。

經過./bin/activemq console 命令查看運行日誌:

主機名中包含非法字符;

那麼解決辦法就很簡單了,改主機名:

一、方法一使用hostnamectl命令

hostnamectl set-hostname 主機名

二、方法二:修改配置文件 /etc/hostname 保存退出

修改完成以後重啓便可,這裏我使用的是方法一:

hostnamectl set-hostname activemq

查看運行狀態:

5、ActiveMQ頁面介紹

待ActiveMQ安裝啓動好,訪問http://ip:8161/admin,登陸名和密碼都是admin(在配置文件中可修改),進入ActiveMQ的主頁:

下面來介紹每一個菜單的功能:

1.Queue消息隊列頁面

Name:消息隊列的名稱。

Number Of Pending Messages:未被消費的消息數目。

Number Of Consumers:消費者的數量。

Messages Enqueued:進入隊列的消息 ;進入隊列的總消息數目,包括已經被消費的和未被消費的。 這個數量只增不減。

Messages Dequeued:出了隊列的消息,能夠理解爲是被消費掉的消息數量。在Queues裏它和進入隊列的總數量相等(由於一個消息只會被成功消費一次),若是暫時不等是由於消費者還沒來得及消費。

2.Topic主題頁面

Name:主題名稱。

Number Of Pending Messages:未被消費的消息數目。

Number Of Consumers:消費者的數量。

Messages Enqueued:進入隊列的消息 ;進入隊列的總消息數目,包括已經被消費的和未被消費的。 這個數量只增不減。

Messages Dequeued:出了隊列的消息,能夠理解爲是被消費掉的消息數量。在Topics裏,由於多消費者從而致使數量會比入隊列數高。

3.Subscribers查看訂閱者頁面

查看訂閱者信息,只在Topics消息類型中這個頁面纔會有數據。

4.Connections查看鏈接數頁面

6、簡單使用

引入jar包:

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>

1.點對點(P2P)模型

​ 點對點模型,採用的是隊列(Queue)做爲消息載體。在該模式中,一條消息只能被一個消費者消費,沒有被消費的,只能留在隊列中,等待被消費,或者超時。舉個例子,若是隊列中有10條消息,有兩個消費者,就是一個消費者消費5條信息,你一條我一條。如下以代碼演示。

消息發佈者:

public static void main(String[] args) throws JMSException {
    /*
     * 實現步驟
     * 1.創建ConnectionFactory工廠對象,須要填入用戶名、密碼、鏈接地址(通常使用默認,若是沒有修改的話)
     * 2.經過ConnectionFactory對象建立一個Connection鏈接,而且調用Connection的start方法開啓鏈接,Connection方法默認是關閉的
     * 3.經過Connection對象建立Session會話(上下文環境對象),用於接收消息,參數1是是否啓用事物,參數2是簽收模式,通常設置爲自動簽收
     * 4.經過Session對象建立Destination對象,指的是一個客戶端用來制定生產消息目標和消費消息來源的對象。在PTP的模式中,Destination被稱做隊列,在Pub/Sub模式中,Destination被稱做主題(Topic)
     * 5.經過Session對象建立消息的發送和接收對象(生產者和消費者)
     * 6.經過MessageProducer的setDeliverMode方法爲其設置持久化或者非持久化特性
     * 7.使用JMS規範的TextMessage形式建立數據(經過Session對象),並用MessageProducer的send方法發送數據。客戶端同理。記得關閉
     */
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
            ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("queue");
    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    for (int i=0;i<=5;i++) {
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText("我是第"+i+"消息");
        producer.send(textMessage);
    }
    if(connection!=null){
        connection.close();
    }
}

消息消費者:

public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("queue");
        MessageConsumer consumer = session.createConsumer(destination);
        while (true){
            TextMessage message = (TextMessage) consumer.receive();
            if (message==null){
                break;
            }
            System.out.println(message.getText());
        }
        if(connection!=null){
            connection.close();
        }
    }

先啓動兩個消費者,在啓動發佈者:

2.發佈/訂閱(Pub/Sub)模型

發佈/訂閱模型採用的是主題(Topic)做爲消息通信載體。該模式相似微信公衆號的模式。發佈者發佈一條信息,而後將該信息傳遞給全部的訂閱者。注意:訂閱者想要接收到該信息,必須在該信息發佈以前訂閱。

發佈者發佈信息:

public static void main(String[] args) throws JMSException, IOException {
        // 建立一個ConnectionFactory對象鏈接MQ服務器
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://94.191.49.192:61616");
        // 建立一個鏈接對象
        Connection connection;
        connection = connectionFactory.createConnection();
        // 開啓鏈接
        connection.start();
        // 使用Connection對象建立一個Session對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 建立一個Destination對象。topic對象
        Topic topic = session.createTopic("test-topic");
        // 使用Session對象建立一個消費者對象。
        MessageConsumer consumer = session.createConsumer(topic);
        // 接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // 打印結果
                TextMessage textMessage = (TextMessage) message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println("這是接收到的消息:" + text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        });
        System.out.println("topic消費者啓動。。。。");
        // 等待接收消息
        System.in.read();
        // 關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

訂閱者訂閱信息:

public static void main(String[] args) throws JMSException {
        // 一、建立一個鏈接工廠對象,須要指定服務的ip及端口。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://94.191.49.192:61616");
        // 二、使用工廠對象建立一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        // 三、開啓鏈接,調用Connection對象的start方法。
        connection.start();
        // 四、建立一個Session對象。
        // 第一個參數:是否開啓事務。若是true開啓事務,第二個參數無心義。通常不開啓事務false。
        // 第二個參數:應答模式。自動應答或者手動應答。通常自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 五、使用Session對象建立一個Destination對象。兩種形式queue、topic,如今應該使用topic
        Topic topic = session.createTopic("test-topic");
        // 六、使用Session對象建立一個Producer對象。
        MessageProducer producer = session.createProducer(topic);
        // 七、建立一個Message對象,可使用TextMessage。
        for (int i = 0; i < 50; i++) {
            TextMessage textMessage = session.createTextMessage("第" + i + "一個ActiveMQ隊列目的地的消息");
            // 八、發送消息
            producer.send(textMessage);
        }
        // 九、關閉資源
        producer.close();
        session.close();
        connection.close();
    }

訂閱者要提早訂閱,因此先運行訂閱者。

3.兩種模式對比

1)由以上,咱們能夠總結出ActiveMQ的實現步驟:

  • 創建ConnectionFactory工廠對象,須要填入用戶名、密碼、鏈接地址
  • 經過ConnectionFactory對象建立一個Connection鏈接
  • 經過Connection對象建立Session會話
  • 經過Session對象建立Destination對象;在P2P的模式中,Destination被稱做隊列(Queue),在Pub/Sub模式中,Destination被稱做主題(Topic)
  • 經過Session對象建立消息的發送和接收對象
  • 發送消息
  • 關閉資源

2)能夠看出,P2P模式和Pub/Sub模式,在實現上的區別是經過Session建立的Destination對象不同,在P2P的模式中,Destination被稱做隊列(Queue),在Pub/Sub模式中,Destination被稱做主題(Topic)

7、參考

  1. https://www.jianshu.com/p/0363ac9ff574
  2. https://juejin.im/post/5adaaae351882567356415eb
相關文章
相關標籤/搜索