分佈式消息通訊ActiveMQ

消息中間件

消息中間件是指利用高效可靠的消息傳遞機制進行平臺無關的數據交流,而且基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,能夠在分佈式架構下擴展進程之間的通訊。html

消息中間件能作什麼

消息中間件主要解決分佈式系統之間消息的傳遞問題 ,可以屏蔽各類平臺以及協議之間的特性,實現應用之間的協同。java

示例:spring

電商平臺中的註冊功能,用戶註冊不單是向數據庫insert,可能還須要贈送積分,發送郵件,發送短信等系列操做。數據庫

假如:每一個操做都耗時1s,那麼註冊過程就須要耗時4s才能響應給用戶。從註冊這個服務能夠看出,每一個子操做都是獨立的,同時,基於領域劃分之後,它們都屬於不一樣的子域。因此咱們能夠對這些子操做實現異步化操做。相似多線程並行處理。apache

如何實現異步化?用多線程能實現嗎?多線程固然能夠實現,只是,消息的持久化、消息的重發這些條件,多線程 並不能知足.因此須要藉助一些開源的消息中間件來解決。 而分佈式消息隊列就是一個很好的解決辦法。經過引入分佈式隊列,大大提高程序的處理效率,而且還解決了各個模塊之間的耦合問題。服務器

分佈式消息隊列解決的場景:session

引入消息中間件後(異步處理),電商平臺中的註冊架構圖變爲多線程

電商中的秒殺:架構

 

用戶提交過來的請求,先寫入消息隊列。消息隊列是有長度的,若是消息隊列超過指定長度,直接拋棄。異步

秒殺的 具體核心處理業務,接收消息隊列中消息進行處理。這裏的消息處理能力取決於消費端自己的吞吐量。

解耦、異步化、流量整形、數據的最終一致性(最大化的重試完成數據一致性)

ActiveMQ 簡介

ActiveMQ

ActiveMQ 是徹底基於JMS 規範實現的一個消息中間件產品,是Apache 開源基金會研發的消息中間件。ActiveMQ 主要應用在分佈式系統架構中,幫助構建高可用、高性能、可伸縮的企業級面向服務的系統。

ActiveMQ 特性

  • 多語言和協議編寫客戶端

    • 語言:Java、C、C++、C#、Ruby、Perl、Python、PHP

    • 協議:openwire、stomp、REST、ws、notification、xmpp、AMQP

  • 徹底支持JMS1.1和J2EE1.4規範

  • 對Spring的支持,ActiveMQ能夠很容易的嵌入到spring模塊中

ActiveMQ 下載安裝啓動

下載地址

http://activemq.apache.org/activemq-5158-release.html

解壓

tar -zxvf apache-activemq-5.15.8-bin.tar.gz

啓動服務

  • cd apache-activemq-5.15.8/bin

    sh activemq start

  • 啓動並帶指定日誌文件 sh activemq start > /tmp/activemqlog

關閉服務

  • sh activemq stop

監控地址

http://192.168.15.134:8161/admin/ admin admin

ActiveMQ 的端口61616

  • 默認爲61616

  • 檢查是否成功啓動ActiveMQ

    • netstat -an|grep 61616

JMS 基本概念和模型

JMS的定義

JMS(Java Message Service) :面向消息中間件的API

MOM(Message Oriented Middleware):面向消息中間件

Java 消息服務是Java平臺中關於面向消息中間件的API,用於兩個程序 之間,或者分佈式系統中發送消息,進行異步通訊。

JMS 是一個與具體平臺無關的API,絕大多數MOM 提供商都對JMS提供了支持。ActiveMQ就是其中的一個實現。

MOM

MOM 是面向消息的中間件,使用消息傳送提供者來協調消息傳送操做。 MOM 須要提供API和管理工具。客戶端使用API調用,把消息發送到由提供者管理的目的地。在發送消息後,客戶端會繼續執行其餘工做,而且在接收方收到這個消息確認以前,提供者一直保留該消息。

MOM 的特色

  • 消息異步接收,發送者不須要等待消息接受者響應

  • 消息可靠接收,確保消息中間件可靠保存。只有接收方收到消息後才刪除消息

開源JMS提供商

JbossMQ(jboss4)、Jboss messaging(jboss5)、joram、ubermq、mantamq、openjms ...

JMS 規範

JMS 規範的目的是爲了使得Java 應用程序可以訪問現有MOM(消息中間件)系統,造成一套統一的標準規範,解決不一樣消息中間件之間的協做問題。

  • 不一樣消息的傳遞域,點對點消息傳送和發佈/訂閱消息傳送

  • 提供接收同步和異步消息的工具

  • 對可靠消息傳送的支持

  • 常見消息格式,例如流、文本和字節

JMS 的體系結構

 

JMS 的基本功能

JMS 的基本功能是用於和麪向消息中間件相互通訊的應用程序的接口

消息傳遞域

  • p2p(point-2-point) 點對點消息傳遞域

    • 每一個消息只能有一個消費者(離線存儲)

      • 相似QQ聊天的私聊

    • 生產者和消費者之間沒有時間上的相關性,不管消費者在生產者發送消息的時候是否處於運行狀態,均可以提取消息

    • 若是session關閉時,有一些消息已經被收到,可是沒有被簽收,消費者下一次鏈接到相同對列時,這些消息仍然會被接收

    • 若是用戶在receive 方法中設定了消息的選擇條件(消息過濾)

    • 若是是持久化消息,消息會被持久化保存,直到消息被簽收

  • 發佈訂閱(publish/subscribe)消息傳遞域

    • 每一個消息有多個消費者

      • 相似QQ羣聊

    • 生產者和消費者有時間上的相關性

      • 訂閱一個主題的消費者只能消費自它訂閱以後發佈的消息。

      • JMS 規範容許客戶建立持久訂閱,必定程度上下降了時間的相關性要求

      • 持久訂閱容許消費者消費它在未處於激活狀態時發送的消息

    • 持久化訂閱和非持久化訂閱

    • 在非持久化訂閱的前提下,不能恢復或者從新指派一個未簽收的消息;

    • 若是全部消息必需要簽收,則使用持久訂閱

消息的組成

消息頭(Header)

消息頭包含消息的識別信息和路由信息

消息頭包含一些標準的屬性:

  • JMSDestination

    • 消息發送的目的地,queue或者topic

  • JMSDeliveryMode

    • 傳送模式,持久化模式和非持久模式

  • JMSPrority

    • 消息優先級(優先級分爲10個級別,從0最低-9最高)

    • 若是不設定優先級,默認級別4,須要注意的是,JMS Provider 並不必定保證按照優先級的順序提交

  • JMSMessageID

    • 惟一識別每一個消息的標識

消息體

就是咱們須要傳遞的消息的內容

JMS API定義了5種消息體格式:

  • TextMessage

    • java.lang.String 對象,如xml文件內容

  • MapMessage

    • 名/值對的集合,名是String 對象,值能夠是Java 任何基本類型

  • BytesMessage

    • 字節流

  • StreamMessage

    • Java 中的輸入輸出流

  • ObjectMessage

    • Java 中的可序列化對象

  • Message

    • 沒有消息體,只有消息頭和屬性

消息的屬性

按類型分爲:

  • 應用設置的屬性

    • Message.setStringProperty(key,value);

  • 標準屬性

    • 使用「JMSX」 做爲屬性名的前綴

  • 消息中間件定義的屬性

    • JMS Provider 特定的屬性

JMS 的可靠機制

消息的確認方式

消息的處理階段:

  • 客戶端接收消息

  • 客戶端處理消息

  • 消息被確認

會話存在兩種機制:

  • 事務性會話

    • createSession(boolean transacted, int acknowledgeMode)

      • Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

    • session.commit() //消息被確認 事務提交意味着生產的全部消息被髮送,消費的全部消息被確認

    • session.rollback(); //從新處理 消息沒有被提交,沒有被處理,消費端的全部消息被恢復,而且從新被提交, 表示一個事務結束, 另外一個事務會開始。事務回滾意味着生產的全部消息被銷燬,消費的全部消息 被恢復並從新提交,除非它們已通過期

    • 經過session.commit() //完成事務的簽收

  • 非事務性會話

    • transacted 設置爲FALSE

    • Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

    • 客戶端簽收模型

    • Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);

    • 那麼須要手動簽收

    • textMessage.acknowledge();

    • 客戶端延遲確認,消息可能重複消費

      • Session session = connection.createSession(Boolean.FALSE, DUPS_OK_ACKNOWLEDGE);

事務性的自動確認

非事務性的自動確認和手動確認

消息的持久化存儲

持久化(存儲在數據庫或磁盤)

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

對於持久消息,消息提供者會使用存儲-轉發機制,先將消息存儲到穩定的介質中,等消息發送成功後再刪除。若是JMS Provider 宕機,那麼這些未送達的消息則不會丟失,JMS Provider 恢復正常後,會從新讀取這些消息,並傳送給對應的消費者。

非持久化(存儲在內存中)

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

對於非持久化消息,JMS Provider 不會將它存到文件、數據庫等穩定介質中。也就是說非持久消息,存儲在內存中,若是JMS Provider 宕機,那麼非持久化消息會丟失。

持久訂閱

  • 持久訂閱者和非持久訂閱者針對的Domain 是Pub/Sub,而不是P2P

  • 當Broker 發送消息給訂閱者時,若是訂閱者處於未激活狀態,持久訂閱者能夠收到消息,而非持久訂閱者則收不到消息。

  • 當持久訂閱者處於未激活狀態時,Broker 須要爲持久訂閱者保存消息,若是持久訂閱者訂閱的消息太多則會溢出。

  • 持久訂閱時,客戶端向JMS 服務器註冊一個本身身份的ID, 當這個客戶端處於離線時,JMS Provider 會爲這個ID 保存全部發送到主題的消息,當客戶再次鏈接到 JMS Provider時,會根據本身的ID獲得全部當本身處於離線時發送到主題的消息。

  • 持久訂閱的方式(消費端)

    • connection.setClientID("test");

    • Topic destination=session.createTopic("myTopic");

    • MessageConsumer consumer=session.createDurableSubscriber(destination,"test");

JMS 規範結合ActiveMQ 實現消息發送

案例架構圖

 

示例代碼

引入Jar 包

 <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.8</version>
 </dependency> 

生產端

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JMSQueueProducer {
    public static void main(String args[]) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.15.134:61616");
        Connection connection = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //建立目的地
            Destination destination = session.createQueue("myQueue");
            //建立發送者
            MessageProducer producer = session.createProducer(destination);
            //持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            TextMessage textMessage = session.createTextMessage("Hello,World");
            producer.send(textMessage);
            session.commit();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 消費端

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import javax.xml.soap.Text;
public class JMSQueueConsumer {

    public static void main(String args[]) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.15.134:61616");
        Connection connection = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            //建立目的地
            Destination destination = session.createQueue("myQueue");
            //建立接收者
            MessageConsumer  consumer = session.createConsumer(destination);
            //接收消息    阻塞方式監聽消息
            TextMessage textMessage =(TextMessage) consumer.receive();
            System.out.println(textMessage.getText());
            session.commit(); //表示消息被自動確認
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null)
            {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
相關文章
相關標籤/搜索