消息隊列學習基礎

什麼是MOM

MOM 就是面向消息中間件(Message-oriented middleware),是用於以分佈式應用或系統中的異步、鬆耦合、可靠、可擴展和安全通訊的一類軟件。MOM 的整體思想是它做爲消息發送器和消息接收器之間的消息中介,這種中介提供了一個全新水平的鬆耦合。
前端

MOM思想就是A和B兩個應用程序不直接發送消息。以前A和B直接發送消息有不少效率問題,如A發送以後B沒有及時接受,那麼A就一直再在那裏堵塞併發性很差,A必須等B接受完以後有返結果了A才能夠結束。而MOM就是爲了解決這樣的問題,不讓A與B之間交互,在A和B之間加一個消息中間件,A把消息放到消息中間上,就能夠走了,去作別的事情,B何時來消息中間件取消息A不用知道也不用管。這樣就提升了效率提供併發性,等B去走後能夠經過狀態,通知,回調等方式通知A就能夠。市面上實現這種思想的技術有不少,IBM(MQSEVICES)、Microsoft(MSMQ)以及BEA的MessageMQ等。處於百家爭鳴階段都是各自實現各自的,沒有統一實現標準。此時SUN爲了實現統一標準就出現了JMS統一實現規範。JMS主要有2種消息模型,點到點和發佈訂閱兩種。數據庫

什麼是消息隊列

消息隊列是在消息的傳輸過程當中保存消息的容器,用於接收消息並以文件的方式存儲,一個消息隊列能夠被一個也能夠被多個消費者消費。編程

消息隊列中間件是分佈式系統中重要的組件,主要解決應用耦合、異步消息、流量削鋒等問題。實現高性能、高可用、可伸縮和最終一致性架構。是大型分佈式系統不可缺乏的中間件。安全

目前在生產環境,使用較多的消息隊列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。bash

消息隊列優勢

  1. 將數據從一個應用程序傳到另外一個應用程序,或者從軟件的一個模塊傳送到另一個模塊
  2. 負責創建網絡通訊的通道,進行數據的可靠傳送
  3. 保證數據不重發,不丟失
  4. 可以實現跨平臺操做,可以爲不一樣操做系統上的軟件集成技工數據傳送服務

消息隊列的應用場景

下面詳細介紹一下消息隊列在實際應用中經常使用的使用場景。場景分爲異步處理、應用解耦、流量削鋒和消息通信四個場景。
服務器

異步處理

場景說明 用戶註冊後,須要發送註冊郵件和發送註冊信息,傳統的作法有兩種:串行方式並行方式網絡

串行方式session

將註冊信息寫入數據庫成功後,發送註冊郵件,而後發送註冊短信,而全部任務執行完成後,返回信息給客戶端
架構


並行方式併發

將註冊信息寫入數據庫成功後,同時進行發送註冊郵件和發送註冊短信的操做。而全部任務執行完成後,返回信息給客戶端。同串行方式相比,並行方式能夠提升執行效率,減小執行時間。

上面的比較能夠發現,假設三個操做均須要50ms的執行時間,排除網絡因素,則最終執行完成,串行方式須要150ms,而並行方式須要100ms。

由於cpu在單位時間內處理的請求數量是一致的,假設:CPU每1秒吞吐量是100此,則串行方式1秒內可執行的請求量爲1000/150,不到7次;並行方式1秒內可執行的請求量爲1000/100,爲10次。

由上能夠看出,傳統串行和並行的方式會受到系統性能的侷限,那麼如何解決這個問題?
咱們須要引入消息隊列,將不是必須的業務邏輯,異步進行處理,由此改造出來的流程爲


根據上述的流程,用戶的響應的時間基本至關於將數據寫入數據庫的時間,發送註冊郵件,發送註冊短信的消息在寫入消息隊列後,便可返回執行結果,寫入消息隊列的時間很快,幾乎能夠忽略,也有此能夠將系統吞吐量提高至20QPS,比串行方式提高近3倍,比並行方式提高2倍。

應用解耦

場景說明 用戶下單後,訂單系統須要通知庫存系統。

傳統的作法爲:訂單系統調用庫存系統的接口。以下圖所示:


傳統方式具備以下缺點:
  1. 假設庫存系統訪問失敗,則訂單減小庫存失敗,致使訂單建立失敗
  2. 訂單系統同庫存系統過分耦合

如何解決上述的缺點呢?須要引入消息隊列,引入消息隊列後的架構以下圖所示:


引入消息隊列,實現應用解耦
  • 訂單系統:用戶下單後,訂單系統進行數據持久化處理,而後將消息寫入消息隊列,返回訂單建立成功
  • 庫存系統:使用拉/推的方式,獲取下單信息,庫存系統根據訂單信息,進行庫存操做。

假如在下單時庫存系統不能正常使用。也不影響正常下單,由於下單後,訂單系統寫入消息隊列就再也不關心其後續操做了。由此實現了訂單系統與庫存系統的應用解耦。

流量削鋒

流量削峯 也是消息對列中的經常使用場景,通常在秒殺或團搶活動中使用普遍。

應用場景 秒殺活動,通常會由於流量過大,致使流量暴增,應用掛掉。爲解決這個問題,通常須要在應用前端加入消息隊列。

  1. 能夠控制參與活動的人數;
  2. 能夠緩解短期內高流量對應用的巨大壓力;

流量削鋒處理方式系統圖以下:


流量削鋒方式系統圖
  1. 服務器在接收到用戶請求後,首先寫入消息隊列。這時若是消息隊列中消息數量超過最大數量,則直接拒絕用戶請求或返回跳轉到錯誤頁面;
  2. 秒殺業務根據秒殺規則讀取消息隊列中的請求信息,進行後續處理。

日誌處理

日誌處理是指將消息隊列用在日誌處理中,好比Kafka的應用,解決大量日誌傳輸的問題。

日誌處理是指將消息隊列用在日誌處理中,好比Kafka的應用,解決大量日誌傳輸的問題。架構簡化以下:


消息隊列應用於日誌處理的架構
  • 日誌採集客戶端:負責日誌數據採集,定時寫受寫入Kafka隊列;
  • Kafka消息隊列:負責日誌數據的接收,存儲和轉發;
  • 日誌處理應用:訂閱並消費kafka隊列中的日誌數據;

這種架構在實際開發中的應用,能夠參照案例:新浪技術分享:咱們如何扛下32億條實時日誌的分析處理

服務的技術架構設計

  1. Kafka:接收用戶日誌的消息隊列。
  2. Logstash:作日誌解析,統一成JSON輸出給Elasticsearch。
  3. Elasticsearch:實時日誌分析服務的核心技術,一個schemaless,實時的數據存儲服務,經過index組織數據,兼具強大的搜索和統計功能。
  4. Kibana:基於Elasticsearch的數據可視化組件,超強的數據可視化能力是衆多公司選擇ELK stack的重要緣由。

消息通信

消息通信是指,消息隊列通常都內置了高效的通訊機制,所以也能夠用在純的消息通信。好比實現點對點消息隊列、聊天室等。

點對點通信

點對點通信架構設計

在點對點通信架構設計中,客戶端A和客戶端B共用一個消息隊列,便可實現消息通信功能。

聊天室通信

聊天室通信架構設計

客戶端A、客戶端B、直至客戶端N訂閱同一消息隊列,進行消息的發佈與接收,便可實現聊天通信方案架構設計。

JMS消息服務

講消息隊列就不得不提JMS。JMS(Java Message Service,Java消息服務) JMS 叫作 Java 消息服務(Java Message Service),是 Java 平臺上有關面向 MOM 的技術規範,旨在經過提供標準的產生、發送、接收和處理消息的 API 簡化企業應用的開發,相似於 JDBC 和關係型數據庫通訊方式的抽象。

API是一個消息服務的標準/規範,容許應用程序組件基於JavaEE平臺建立,發送,接收和讀取消息。他是分佈式通訊耦合度更低,消息服務更加可靠以及異步性。

在EJB架構中,有消息bean能夠無縫的與JM消息服務集成。在J2EE架構模式中,有消息服務者模式,用於實現消息與應用直接的解耦。


經常使用概念

  • Provider:純 Java 語言編寫的 JMS 接口實現(好比 ActiveMQ 就是)
  • Domains:消息傳遞方式,包括點對點(P2P)、發佈/訂閱(Pub/Sub)兩種
  • Connection factory:客戶端使用鏈接工廠來建立與 JMS provider 的鏈接
  • Destination:消息被尋址、發送以及接收的對象

消息模型

在JMS標準中,有兩種消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)

P2P 模式


P2P(點對點)模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每一個消息都被髮送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留着消息,知道他們被消費或者超時。

P2P 消息域使用 queue 做爲 Destination,消息能夠被同步或異步的發送和接收,每一個消息只會給一個 Consumer 傳送一次。Consumer 可使用 MessageConsumer.receive() 同步地接收消息,也能夠經過使用MessageConsumer.setMessageListener() 註冊一個 MessageListener 實現異步接收。

多個 Consumer 能夠註冊到同一個 queue 上,但一個消息只能被一個 Consumer 所接收,而後由該 Consumer 來確認消息。而且在這種狀況下,Provider 對全部註冊的 Consumer 以輪詢的方式發送消息。


P2P的特色

  1. 每一個消息只有一個消費者(Consumer)(即一旦被消費,消息就再也不在消息隊列中,其餘的消費者就不能獲得這條消息了。)
  2. 發送者和接收者質檢在時間上沒有依賴性,也就是說當發送者發送了消息以後,無論接收者有沒有正在運行,他不會影響到消息發送到隊列。
  3. 消費者必須確認對消息的接收

    收到消息後消費者必須確認消息已被接收,不然JMS服務提供者會認爲該消息沒有被接收,那麼這條消息仍然能夠被其餘人接收。程序能夠自動進行確認,不須要人工干預。

  4. 非持久的消息最多隻發送一次

    非持久的消息最多隻發送一次,表示消息有可能未被髮送,形成未被髮送的緣由可能有:

    一、 JMS服務提供者出現宕機等狀況,形成非持久信息的丟失

    二、 隊列中的消息過時,未被接收

  5.  持久的消息嚴格發送一次

    咱們能夠將比較重要的消息設置爲持久化的消息,持久化後的消息不會由於JMS服務提供者的故障或者其餘緣由形成消息丟失。

若是但願發送的每一個消息都會被成功處理的話,那麼須要p2p 模式

Pub/Sub模式



包含三個角色:主題(Topic),發佈者(Publisher),訂閱者(Subscriber)。多個發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。

Pub/Sub(發佈/訂閱,Publish/Subscribe)消息域使用 topic 做爲 Destination,發佈者向 topic 發送消息,訂閱者註冊接收來自 topic 的消息。發送到 topic 的任何消息都將自動傳遞給全部訂閱者。接收方式(同步和異步)與 P2P 域相同。

除非顯式指定,不然 topic 不會爲訂閱者保留消息。固然,這能夠經過持久化(Durable)訂閱來實現消息的保存。這種狀況下,當訂閱者與 Provider 斷開時,Provider 會爲它存儲消息。當持久化訂閱者從新鏈接時,將會受到全部的斷連期間未消費的消息。

Pub/Sub的特色

  • 每一個消息均可以有多個(0,1,……)訂閱者,每條消息能夠有多個消費者,若是報紙和雜誌同樣,誰訂閱了誰均可以得到。

  • 發佈者和訂閱者之間有時間上的依賴性。訂閱者只能消費他們訂閱以後出版的消息,針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者以後,才能消費發佈者的消息。這就要求訂閱者必須先訂閱,生產者再發布。即訂閱者必須先運行,再等待生產者的運行,這和點對點類型有所差別。
  • 爲了消費消息,訂閱者必須保持運行的狀態。即訂閱者必須保持活動狀態等待發布者發佈的消息,若是訂閱者在發佈者發佈消息以後才運行,則不能得到先前發佈者發佈的消息。

爲了緩和這樣嚴格的時間相關性,JMS容許訂閱者建立一個可持久化的訂閱。這樣,即便訂閱者沒有被激活(運行),它也能接收到發佈者的消息。
若是但願發送的消息能夠不被作任何處理、或者只被一個消息者處理、或者能夠被多個消費者處理的話,那麼能夠採用Pub/Sub模型。

消息消費

在JMS中,消息的產生和消費都是異步的。對於消費來講,JMS的消息者能夠經過兩種方式來小消費消息。

  1. 同步
    訂閱者或接收者經過receive方法來接受消息,receive在接收到消息以前(或超時以前)將一直阻塞。
  2. 異步
    訂閱者或接收者亦能夠註冊未一個消息監聽器。當消息到達以後,系統自動調用監聽器的onMessage的方法。
JDNI:Java命名和目錄接口,是一種標準的Java命名系統接口。能夠在網絡上查找和訪問服務。經過指定一個資源名稱,該名稱對應於數據庫或者命名服務中的一個記錄,同時返回資源鏈接創建所必需的信息。

JNDI在JMS中起到查找二號訪問發送目標或消息來源的做用。

JMS編程

JMS通用步驟

  • 獲取鏈接工廠
  • 使用鏈接工廠建立鏈接
  • 啓動鏈接
  • 從鏈接建立會話
  • 獲取 Destination
  • 建立 Producer,或
    • 建立 Producer
    • 建立 message
  • 建立 Consumer,或發送或接收message發送或接收 message
    • 建立 Consumer
    • 註冊消息監聽器(可選)
  • 發送或接收 message
  • 關閉資源(connection, session, producer, consumer 等)

JMS編程模型

1.ConnectionFactory

建立Connection對象的工廠,針對兩週不一樣的JMS消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。能夠經過JNDI來查找ConnectionFactory對象。

2.Destination

Destination的意思是消息生產者的消息發送目標或着說消息消費者的消息來源。對於消息生產者來講。他的Destination是某個隊列(queue)或者某個主題(Topic);對於消息消費者來講,他的Destination也是某個隊列或主題(即消息來源)。

因此,Destination實際上就是兩種類型的對象:Queue,Topic能夠經過JNDI來查找Destination

3.Connection

Connection表示在客戶端和JMS系統之間創建的連接(對TCP/IP Socket的包裝)。Connection能夠產生一個或多個Session。跟ConnectionFactory同樣,Connection也有兩種類型:QueueConnection和TopicConnection。

4.Session

Session是操做消息的接口。能夠經過session建立生產者、消費者、消息等。Session提供了事務的功能。當須要使用session發送/接收多個消息時,能夠將這些發送/接收動做放到一個事務中。一樣,也分QueueSession和TopicSession。

5.消息的生產者

消息生產者由Session建立,並用於將消息發送到Destination。一樣,消息生產者分兩種類型:QueueSender和TopicPublisher。能夠調用消息生產者的方法(send或publish方法)發送消息。

6.消息消費者

消息消費者由Session建立,用於接收被髮送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別經過session的createReceiver(Queue)或createSubscriber(Topic)來建立。固然,也能夠session的creatDurableSubscriber方法來建立持久化的訂閱者。

7. MessageListener

消息監聽器。若是註冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。

深刻學習JMS對掌握JAVA架構、EJB架構有很好的幫助,消息中間件也是大型分佈式系統必須的組件。本次分享主要作全局性介紹,具體的深刻須要你們學習,實踐,總結,領會。

JMS編程實戰

這裏拿ActiveMQ 舉例

public class JMSDemo {
        ConnectionFactory connectionFactory;
        Connection connection;
        Session session;
        Destination destination;
        MessageProducer producer;
        MessageConsumer consumer;
        Message message;
        boolean useTransaction = false;
        try {
                Context ctx = new InitialContext();
                connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactoryName");
                //使用ActiveMQ時:connectionFactory = new ActiveMQConnectionFactory(user, password, getOptimizeBrokerUrl(broker));
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("TEST.QUEUE");
                //生產者發送消息
                producer = session.createProducer(destination);
                message = session.createTextMessage("this is a test");

                //消費者同步接收
                consumer = session.createConsumer(destination);
                message = (TextMessage) consumer.receive(1000);
                System.out.println("Received message: " + message);
                //消費者異步接收
                consumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                                if (message != null) {
                                        doMessageEvent(message);
                                }
                        }
                });
        } catch (JMSException e) {
                ...
        } finally {
                producer.close();
                session.close();
                connection.close();
        }
}複製代碼
相關文章
相關標籤/搜索