MQ介紹 & 實例

閱讀目錄php

 

定義:html

消息隊列(MQ)是一種應用程序對應用程序的通訊方法,應用程序經過隊列進行通訊,而不是經過直接調用彼此來通訊,隊列的使用除去了接收和發送應用程序同時執行的要求。是進行通訊的中間件產品。(換言之:MQ負責兩個系統之間傳遞消息,這兩個系統能夠是異構的,處於不一樣硬件、不一樣操做系統、用不一樣語言編寫,只須要簡單的調用幾個MQ的API,就能夠互相通信,你沒必要考慮底層系統和網絡的複雜性。MQ可以應付多種異常狀況,例如網絡阻塞、臨時中斷等等)java

PS:直接調用一般是用於諸如遠程過程調用的技術。python

補充知識:MB(消息路由、數據轉換)web

優秀MQ特色spring

  a>.高可用性,但願MQ能支撐7x24小時應用,而不是三天兩頭當機,要作到高可用性,就須要作MQ的集羣,一臺當了,不影響整個集羣的服務能力,這裏涉及到告警、流控、消息的負載均衡、數據庫的使用、測試的完備程度等等。數據庫

  b>.消息存儲的高可靠性。要保證100%不丟消息。這不只僅是MQ的責任,更涉及到硬件、操做系統、語言平臺和數據庫的一整套方案。許多號稱可靠存儲的MQ產品其實都不可靠,要知道,硬件錯誤是常態,若是在硬件錯誤的狀況下還能保證消息的可靠存儲這纔是難題。這裏可能須要用到特殊的存儲硬件,特殊的數據庫,分佈式的數據存儲,數據庫的分庫分表和同步等等。你要考慮消息存儲在哪裏,是文件系統,仍是數據庫,是本地文件,仍是分佈式文 件,是搞主輔備份呢仍是多主機寫入等等。apache

  c>.高可擴展性,MQ集羣能很好地支持水平擴展,這就要求咱們的節點之間最好不要有通訊和數據同步。編程

  d>.性能,性能是實現高可用性的前提,很難想象單機性能極差的MQ組成的集羣能在高負載下倖免於難。性能因素跟採用的平臺、語言、操做系統、代碼質量、數據 庫、網絡息息相關。MQ產品的核心實際上是消息的存儲,在保證存儲安全的前提下如何保證和提升消息入隊的效率是性能的關鍵因素。這裏須要開發人員創建起性能觀念,不須要你對一行行代碼斤斤計較,可是你須要知道這樣作會形成什麼後果,有沒有更好更快的方式,你怎麼證實它更好更快。軟件實現的性能是一方面,另外一 方面就是平臺相關了,由於MQ本質上是IO密集型的系統,瓶頸在IO,如何優化網絡IO、文件IO這須要專門的知識。性能另外一個相關因素是消息的調度上, 引入消息順序和消息優先級,容許消息的延遲發送,都將增大消息發送調度的複雜性,如何保證高負載下的調度也是要特別注意的地方。api

  e>.高可配置性和監控工具的完整,這是一個MQ產品容易忽略的地方。異步通訊形成了查找問題的難度,不像同步調用那樣有相對明確的時序關係。所以查找異步通訊 的異常是很困難的,這就須要MQ提供方便的DEBUG工具,查找分析日誌的工具,查看消息生命週期的工具,查看系統間依賴關係的工具等等。可定製也是MQ 產品很是重要的一方面,可方便地配置各種參數並在集羣中同步,而且可動態調整各種參數,這將大大下降維護難度。

Ps:一句話總結:全天候不宕機,安全消息存儲,100%不丟失數據高效率的寫入讀出,同時要求方便查錯

產品比較 

RabbitMQ

是使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它變的很是重量級,更適合於企業級的開發。同時實現了一個經紀人(Broker)構架,這意味着消息在發送給客戶端時先在中 心隊列排隊。對路由(Routing),負載均衡(Load balance)或者數據持久化都有很好的支持。

Ps: 結合erlang語言自己的併發優點,性能較好,可是不利於作二次開發和維護

Redis

是一個Key-Value的NoSQL數據庫,開發維護很活躍,雖然它是一個Key-Value數據庫存儲系統,但它自己支持MQ功能,因此徹底可 以當作一個輕量級的隊列服務來使用。對於RabbitMQ和Redis的入隊和出隊操做,各執行100萬次,每10萬次記錄一次執行時間。測試數據分爲 128Bytes、512Bytes、1K和10K四個不一樣大小的數據。實驗代表:入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而如 果數據大小超過了10K,Redis則慢的沒法忍受;出隊時,不管數據大小,Redis都表現出很是好的性能,而RabbitMQ的出隊性能則遠低於 Redis。

 

                     入隊

                     出隊

 

128B

512B

1K

10K

128B

512B

1K

10K

Redis

16088

15961

17094

25

15955

20449

18098

9355

RabbitMQ

10627

9916

9370

2366

3219

3174

2982

1588

Ps: 作爲一個基於內存的K-V數據庫,其提供了消息訂閱的服務,能夠看成MQ來使用,目前應用案例較少,且不方便擴展

ZeroMQ

號稱最快的消息隊列系統,尤爲針對大吞吐量的需求場景。ZMQ可以實現RabbitMQ不擅長的高級/複雜的隊列,可是開發人員須要本身組合多種技 術框架,技術上的複雜度是對這MQ可以應用成功的挑戰。ZeroMQ具備一個獨特的非中間件的模式,你不須要安裝和運行一個消息服務器或中間件,由於你的 應用程序將扮演了這個服務角色。你只須要簡單的引用ZeroMQ程序庫,可使用NuGet安裝,而後你就能夠愉快的在應用程序之間發送消息了。可是 ZeroMQ僅提供非持久性的隊列,也就是說若是down機,數據將會丟失。其中,Twitter的Storm中使用ZeroMQ做爲數據流的傳輸。

Ps: 擴展性好,開發比較靈活,採用C語言實現,實際上他只是一個socket庫的從新封裝,若是咱們作爲消息隊列使用,須要開發大量的代碼

ActiveMQ

是Apache下的一個子項目。 相似於ZeroMQ,它可以以代理人和點對點的技術實現隊列。同時相似於RabbitMQ,它少許代碼就能夠高效地實現高級應用場景。RabbitMQ、 ZeroMQ、ActiveMQ均支持經常使用的多種語言客戶端 C++、Java、.Net,、Python、 Php、 Ruby等。

Ps: 歷史悠久的開源項目,已經在不少產品中獲得應用,實現了JMS1.1規範,能夠和spring-jms輕鬆融合,實現了多種協議,不夠輕巧(源代碼比RocketMQ多).支持持久化到數據庫,對隊列數較多的狀況支持很差.

Jafka/Kafka

Kafka是Apache下的一個子項目,是一個高性能跨語言分佈式Publish/Subscribe消息隊列系統,而Jafka是在Kafka 之上孵化而來的,即Kafka的一個升級版。具備如下特性:快速持久化,能夠在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既能夠 達到10W/s的吞吐速率;徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現複雜均衡;支持 Hadoop數據並行加載,對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka經過 Hadoop的並行加載機制來統一了在線和離線的消息處理,這一點也是本課題所研究系統所看重的。Apache Kafka相對於ActiveMQ是一個很是輕量級的消息系統,除了性能很是好以外,仍是一個工做良好的分佈式系統。

RocketMQ

阿里巴巴的MQ中間件,在其多個產品下使用,並可以撐住雙十一的大流量,他並無實現JMS規範,使用起來很簡單。部署由一個 命名服務(nameserver)和一個代理(broker)組成,nameserver和broker以及producer都支持集羣,隊列的容量受機器硬盤的限制,隊列滿後能夠支持持久化到硬盤(也能夠本身適配代碼,將其持久化到NOSQL數據庫中),隊列滿後會影響吞吐量,能夠採用主備來保證穩定性,支持回溯消費,能夠在broker端進行消息過濾.

其餘一些隊列列表HornetQ、Apache Qpid、Sparrow、Starling、Kestrel、Beanstalkd、Amazon SQS就再也不一一分析。

比較ActiveMQ and RocketMQ

  RocketMQ ActiveMQ   
優先級 須要新建一個特殊隊列來接收優先級高的隊列,沒法實現從0-65535這種細粒度的控制 能夠精細控制  
順序 能夠保證嚴格的消費順序 沒法保證嚴格的順序  
持久化 支持 支持  
穩定性 更高    
消息過濾 RocketMQ能夠在broker端進行過濾,對於咱們的消息總線,這裏能夠節省大量的網絡傳輸是否會有消息重發形成的重複消費:RocketMQ能夠保證,ActiveMQ沒法保證 僅支持在客戶端消費的時候進行判斷是不是本身須要的消息  
回溯消費 支持 不支持 即從新將某一個時刻以前的消息從新消費一遍
事務 支持 支持  
定時消費 支持 不支持  
消息堆積 更優   就是當緩存消息的內存滿了以後的解決方案,一種是丟棄策略,這種不會影響吞吐量,還有一種就是將消息持久化到磁盤,這種會影響吞吐量
客戶端不在線 RocketMQ能夠在客戶端上線後繼續將未消費的消息推送到客戶端    

比較主流的MQ:

  ActiveMQ RabbitMQ RocketMq ZeroMQ
關注度  
成熟度   成熟 成熟 比較成熟 不成熟
所屬社區/公司 Apache  Mozilla Public License Alibaba      
社區活躍度  
文檔  
特色   功能齊全,被大量開源項目使用 因爲Erlang 語言的併發能力,性能很好    各個環節分佈式擴展設計,主從 HA;支持上萬個隊列;多種消費模式;性能很好 低延時,高性能,最高 43萬條消息每秒  
受權方式   開源 開源 開源 開源
開發語言   Java Erlang   Java   C
支持的協議   OpenWire、STOMP、REST、XMPP、AMQP AMQP   本身定義的一套(社區提供JMS--不成熟) TCP、UDP
客戶端支持語言   Java、C、C++、Python、PHP、Perl、.net 等  Java、C、C++、Python、 PHP、Perl 等 Java、C++(不成熟)   python、 java、 php、.net 等
持久化   內存、文件、數據庫 內存、文件 磁盤文件 在消息發送端保存
事務   支持 不支持 支持 不支持
集羣   支持 支持 支持 不支持
負載均衡 支持 支持 支持 不支持
管理界面   通常 無社區有 webconsole   實現
部署方式   獨立、嵌入 獨立 獨立 獨立
評價   優勢:成熟的產品,已經在不少公司獲得應用(非大規模場景)。有較多的文檔。各類協議支持較好,有多重語言的成熟的客戶端;
缺點:根據其餘用戶反饋,會出莫名其妙的問題,切會丟失消息。其重心放到activemq6.0 產品—apollo上去了,目前社區不活躍,且對5.x維護較少;
Activemq不適合用於上千個隊列的應用場景
優勢:因爲erlang語言的特性,mq性能較好;管理界面較豐富,在互聯網公司也有較大規模的應用;支持amqp系誒,有多中語言且支持amqp的客戶端可用
缺點:erlang語言難度較大。集羣不支持動態擴展。
優勢:模型簡單,接口易用(JMS的接口不少場合並不太實用)。在阿里大規模應用。目前支付寶中的餘額寶等新興產品均使用rocketmq。集羣規模大概在50 臺左右,單日處理消息上百億;性能很是好,能夠大量堆積消息在broker中;支持多種消費,包括集羣消費、廣播消費等。開發度較活躍,版本更新很快。
缺點:沒有在mq核心中去實現JMS等接口,
 

 

 實例(簡單的實戰)

   ActiveMQ入門實例:

    1.去官方網站下載:http://activemq.apache.org/
    2.運行ActiveMQ 解壓縮apache-activemq-5.8.0-bin.zip,而後雙擊apache-activemq-5.5.1\bin\activemq.bat運行ActiveMQ程序。
      啓動ActiveMQ之後,登錄:http://localhost:8161/admin/,建立一個Queue,命名爲FirstQueue。
    3.建立Eclipse項目並運行
    建立java project:ActiveMQ,新建lib文件夾,導入以下Jar包

    activemq-broker-5.8.0.jar 、activemq-client-5.8.0.jar 、geronimo-j2ee-management_1.1_spec-1.0.1.jar 、geronimo-jms_1.1_spec-1.1.1.jar 、slf4j-api-1.6.6.jar

    建立類以下類:
    

 

複製代碼
import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.DeliveryMode;  
import javax.jms.Destination;  
import javax.jms.MessageProducer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import org.apache.activemq.ActiveMQConnection;  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
public class Sender {  
    private static final int SEND_NUMBER = 5;  
  
    public static void main(String[] args) {  
        // ConnectionFactory :鏈接工廠,JMS 用它建立鏈接  
        ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS  
        // Provider 的鏈接  
        Connection connection = null; // Session: 一個發送或接收消息的線程  
        Session session; // Destination :消息的目的地;消息發送給誰.  
        Destination destination; // MessageProducer:消息發送者  
        MessageProducer producer; // TextMessage message;  
        // 構造ConnectionFactory實例對象,此處採用ActiveMq的實現jar  
        connectionFactory = new ActiveMQConnectionFactory(  
                ActiveMQConnection.DEFAULT_USER,  
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");  
        try { // 構造從工廠獲得鏈接對象  
            connection = connectionFactory.createConnection();  
            // 啓動  
            connection.start();  
            // 獲取操做鏈接  
            session = connection.createSession(Boolean.TRUE,  
                    Session.AUTO_ACKNOWLEDGE);  
            // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置  
            destination = session.createQueue("FirstQueue");  
            // 獲得消息生成者【發送者】  
            producer = session.createProducer(destination);  
            // 設置不持久化,此處學習,實際根據項目決定  
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
            // 構造消息,此處寫死,項目就是參數,或者方法獲取  
            sendMessage(session, producer);  
            session.commit();  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection)  
                    connection.close();  
            } catch (Throwable ignore) {  
            }  
        }  
    }  
  
    public static void sendMessage(Session session, MessageProducer producer)  
            throws Exception {  
        for (int i = 1; i <= SEND_NUMBER; i++) {  
            TextMessage message = session.createTextMessage("ActiveMq 發送的消息"  + i);  
            // 發送消息到目的地方  
            System.out.println("發送消息:" + "ActiveMq 發送的消息" + i);  
            producer.send(message);  
        }  
    }  
}  
複製代碼

 

 

複製代碼
import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.Destination;  
import javax.jms.MessageConsumer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import org.apache.activemq.ActiveMQConnection;  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
public class Receiver {  
    public static void main(String[] args) {  
        // ConnectionFactory :鏈接工廠,JMS 用它建立鏈接  
        ConnectionFactory connectionFactory;  
        // Connection :JMS 客戶端到JMS Provider 的鏈接  
        Connection connection = null;  
        // Session: 一個發送或接收消息的線程  
        Session session;  
        // Destination :消息的目的地;消息發送給誰.  
        Destination destination;  
        // 消費者,消息接收者  
        MessageConsumer consumer;  
        connectionFactory = new ActiveMQConnectionFactory(  
                ActiveMQConnection.DEFAULT_USER,  
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");  
        try {  
            // 構造從工廠獲得鏈接對象  
            connection = connectionFactory.createConnection();  
            // 啓動  
            connection.start();  
            // 獲取操做鏈接  
            session = connection.createSession(Boolean.FALSE,  
                    Session.AUTO_ACKNOWLEDGE);  
            // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置  
            destination = session.createQueue("FirstQueue");  
            consumer = session.createConsumer(destination);  
            while (true) {  
                // 設置接收者接收消息的時間,爲了便於測試,這裏誰定爲100s  
                TextMessage message = (TextMessage) consumer.receive(100000);  
                if (null != message) {  
                    System.out.println("收到消息" + message.getText());  
                } else {  
                    break;  
                }  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection)  
                    connection.close();  
            } catch (Throwable ignore) {  
            }  
        }  
    }  
}  
複製代碼

 

 

IBM WebSphere MQ介紹安裝以及配置服務詳解(連接)

 

關於消息隊列與分佈式的那些事

  消息隊列技術是分佈式應用間交換信息的一種技術。消息隊列可駐留在內存或磁盤上,隊列存儲消息直到它們被應用程序讀走。經過消息隊列,應用程序可獨立地執行--它們不須要知道彼此的位置、或在繼續執行前不須要等待接收程序接收此消息。在分佈式計算環境中,爲了集成分佈式應用,開發者須要對異構網絡環境下的分佈式應用提供有效的通訊手段。爲了管理須要共享的信息,對應用提供公共的信息交換機制是重要的。

 

設計分佈式應用的方法主要有:

遠程過程調用(PRC)--分佈式計算環境(DCE)的基礎標準成分之一;

對象事務監控(OTM)--基於CORBA的面向對象工業標準與事務處理(TP)監控技術的組合;

消息隊列(MessageQueue)--構造分佈式應用的鬆耦合方法。

 


  (a) 分佈計算環境/遠程過程調用 (DCE/RPC)

 

  RPC是DCE的成分,是一個由開放軟件基金會(OSF)發佈的應用集成的軟件標準。RPC模仿一個程序用函數引用來引用另外一程序的傳統程序設計方法,此引用是過程調用的形式,一旦被調用,程序的控制則    轉向被調用程序。

 

  在RPC實現時,被調用過程可在本地或遠地的另外一系統中駐留並在執行。當被調用程序完成處理輸入數據,結果放在過程調用的返回變量中返回到調用程序。RPC完成後程序控制則當即返回到調用程序。所以    RPC模仿子程序的調用/返回結構,它僅提供了Client(調用程序)和Server(被調用過程)間的同步數據交換。

 


  (b) 對象事務監控 (OTM)

 

  基於CORBA的面向對象工業標準與事務處理(TP)監控技術的組合,在CORBA規範中定義了:使用面向對象技術和方法的體系結構;公共的Client/Server程序設計接口;多平臺間傳輸和翻譯數據的指導方     針;開發分佈式應用接口的語言(IDL)等,併爲構造分佈的Client/Server應用提供了普遍及一致的模式。

 


  (c) 消息隊列 (Message Queue)

 

  消息隊列爲構造以同步或異步方式實現的分佈式應用提供了鬆耦合方法。消息隊列的API調用被嵌入到新的或現存的應用中,經過消息發送到內存或基於磁盤的隊列或從它讀出而提供信息交換。消息隊列可用    在應用中以執行多種功能,好比要求服務、交換信息或異步處理等。

 

中間件是一種獨立的系統軟件或服務程序,分佈式應用系統藉助這種軟件在不一樣的技術之間共享資源,管理計算資源和網絡通信。它在計算機系統中是一個關鍵軟件,它能實現應用的互連和互操做性,能保證    系統的安全、可靠、高效的運行。中間件位於用戶應用和操做系統及網絡軟件之間,它爲應用提供了公用的通訊手段,而且獨立於網絡和操做系統。中間件爲開發者提供了公用於全部環境的應用程序接口,當    應用程序中嵌入其函數調用,它即可利用其運行的特定操做系統和網絡環境的功能,爲應用執行通訊功能。

 

若是沒有消息中間件完成信息交換,應用開發者爲了傳輸數據,必需要學會如何用網絡和操做系統軟件的功能,編寫相應的應用程序來發送和接收信息,且交換信息沒有標準方法,每一個應用必須進行特定的編程從而和多平臺、不一樣環境下的一個或多個應用通訊。例如,爲了實現網絡上不一樣主機系統間的通訊,將要求具有在網絡上如何交換信息的知識(好比用TCP/IP的socket程序設計);爲了實現同一主機內不一樣進程之間的通信,將要求具有操做系統的消息隊列或命名管道(Pipes)等知識。

 

  目前中間件的種類不少,如交易管理中間件(如IBM的CICS)、面向Java應用的Web應用服務器中間件(如IBM的WebSphere Application Server)等,而消息傳輸中間件(MOM)是其中的一種。它簡化了應用之間數據的傳輸,屏蔽底層異構操做系統和網絡平臺,提供一致的通信標準和應用開發,確保分佈式計算網絡環境下可靠的、跨平臺的信息傳輸和數據交換。它基於消息隊列的存儲-轉發機制,並提供特有的異步傳輸機制,可以基於消息傳輸和異步事務處理實現應用整合與數據交換。

 

 

出處:https://www.cnblogs.com/swugogo/p/5910128.html

相關文章
相關標籤/搜索