ActiveMQ(三)——理解和掌握JMS

1、JMS基本概念服務器

  • JMS是什麼
    JMS Java Message Service,Java消息服務,是JavaEE中的一個技術。
  • JMS規範
    JMS定義了Java中訪問消息中間件的接囗,並無給予實現,實現JMS接囗的消息中間件稱爲JMS Provider,例如ActiveMQ
    JMS provider:實現JMS接囗和規範的消息中間件
    JMS message:JMS的消息,JMS消息由如下三部分組成:
    1:消息頭:每一個消息頭字段都有相應的getter和setter方法
    2消息屬性:若是須要除消息頭字段以夕卜的值,那麼可使用消息屬性
    3:消息體:分裝具體的消息數據
  • JMS producer:消息生產者,建立和發送JMS消息的客戶端應用
  • JMS consumer:消息消費者,接收和處理JMS消息的客戶端應用
    消息的消費能夠採用如下兩種方法之一
    1:同步消費:經過調用消費者的receive方法從目的地中顯式提取消息,receive方法能夠一直阻塞到消息到達。
    2:異步消費:客戶能夠爲消費者註冊一個消息監聽器,以定義在消息達到時所採起的動做
  • JMS domains.消息傳遞域,JMS規範中定義了兩種消息傳遞域:點對點(point-to-point,簡寫成PTP)消息傳遞域和發佈/訂閱消息傳遞域(publish/subscrmbe,簡寫成pub/sub)
    1:點對點消息傳遞域的特色以下
    (1)每一個消息只能有一個消費者
    (2)消息的生產者和消費者沒有時間上的相關性。不管消費者在生產者發送消
    息的時候是否處於運行狀態,它均可以提取消息。
    ActiveMQ(三)——理解和掌握JMS
    2:發佈/訂閱消傳遞域的特色以下.
    (1)每一個消息能夠有多個消費者
    (2)生產者和消費者之間有時間上的相關性。訂閱一個主題的消費者只能消費自它訂閱以後發佈的消息。JMS規範容許客戶建立持久訂閱,這在必定程度上放鬆了時間上的相關性要求。持久訂閱容許消費者消費它在未處於激活狀態時發送的消息。
    3:在點對點消息傳遞域中,目的被稱爲隊列(queue);在發佈/訂閱消傳遞域中,
    目的被稱爲主題(topic)
    ActiveMQ(三)——理解和掌握JMS
    Connection factory:鏈接工廠,用來建立鏈接對象,以鏈接到JMS的provider
    JMS Connection:封裝了客戶與JMS提供者之間的一個虛擬的鏈接
    JMS Session:是生產的和消費消息的個單線程上下文會話用於建立消息生產者(producer)消息消費者(consumer)和消息(message)等。會話提供了一個事務性的上下文,在這個上卜文中,一組發送和接收被組合到了個原子操做中。
  • Destination:消息發送到的目的地
  • Acknowledge:簽收
  • Transaction:事務
  • JMS client:用來收發消息的Java應用
  • Non-JMS client:使到JMS provider本地API寫的應用,用來替換JMS API實現收發消息的功能,一般會提供其餘的一些特性,好比:COR、RMIT等。
  • Administered objects:預約義的JMS對象,一般的provider規範中有定義,提供給JMS客戶來司好比:ConnectionFactory和Destination

2、 jms的消息結構session

  • JMS消息山如下幾部分組成,消息頭,屬性和消息體
  • 消息頭包含消息的識別信息和路由信息,消息頭包含一些標準的屬性以下
    1.JMS Destination:由send方法設置
    2.JMSDeliveryMode:由send方法設置
    3.JMSExpiration:由send方法設置
    4.JMSPriority:由send方法設置
    5.JMSMessageID:由send方法設置
    6.JMSTimestamp:由客戶端設置
    7.JMSCorre1ationID:由客戶端設置
    8.JMSReplyTo:由客戶端設置
    9.JMSType:由客戶端設置
    10.JMSRedelivered:由JMS Provider設置
    標準的JMS消息頭包含如下屬性:
    1:JMSDestination:消息發送的目的地:主要是指Queue和Topic,自動分配
    2:JMSDe1iveryMode:傳送模式。有兩種:持久模式和非持久模式。一條持久性的消息應該被傳送「一次僅僅一次」,這就意味者若是JMS提供者出現故障,該消息並不會丟失,它會在服務器恢復以後再次傳遞。一條非持久的消息最多會傳送一次,這味這服務器出現故障,該消息將永遠丟失。自動分配
    3:JMSExpiration:消息過時時間,等於Destination的send方法中的timeToLive值加上發送時刻的GMT時間值。若是timeToLive值等於零,則JMSExpiration被設爲零,表示該消息永不過時。若是發送後,在消息過時時間以後消息尚未被髮送到目的地,則該消息被清除。自動分配
    4:JMSPriority:消息優先級,從0-9十個級別,0到是普通消息,5一9是加急消息。JMS不要求JMS Provider嚴格接照這十個優先級發送消息,但必須保證加尋消息要先於普通消息到達。默認是4級。自動分配
    5:JMSMessageID:惟一識別每一個消息的標識,由JMS Provider產生。自動分配
    6:JMSTimestamp:一個JMS Provider在調用send()方法時自動設置的,它是消息被髮送和消費者實際接收的時間差。自動分配
    7:JMSCorre1ationID:用來鏈接到另一個消息,典型的應用是在回覆消息中鏈接到原消息。在大多數狀況下,JMSCorre1ationID用於將一條消息標記爲對JMSMessageID標示的上條消息的應答,不過,JMSCorre1ationID能夠是任何值,不只僅是JMSMessageID。由開發者設置
    8:JMSReplyTo:提供本消息回覆消息的目的地。由開發者設置
    9:JMSType:消息類類型的識別符。由開發者設置
    10:JHSRedelivered:若是一個客戶端收到一個設置了JMSRede1ivered屬性的消息,則表示可能客戶端曾經在早些時候收到過該消息,但並無簽收(acknowledged),若是該消息被從新傳送,JMSRede1ivered=true反之,
    JMSRedelivered=false。自動設置dom

  • 消息體,JMS API定義了5種消息體格式,也叫消息類型,可使用不一樣形式發送接收數據,並以兼容現有的消息格式。包括:TextMessage、apMessage、BytesMessage、StreamMessage和0bjectMessage
  • 消息屬性,包含如下三種類型的屬性
    1.應用程序設置和添加的屬性,好比:
    Message.setStringProperty(username,username)
    2:JMS定義的屬性
    史用「JMSX」做爲屬性名的前綴,
    connection.getMetaData0,getJMSXPropertyNames(),方法返回全部鏈接支持
    的JMSX屬性的名字。
    3:JMS供應的特定的屬性
  • JMS定義的屬性以下:
    1:JMSXUserID:發送消息的用戶標識,發送時提供商設置
    2:JMSXAppID:發送消息的應用標識,發送時提供商設置
    3:JMSXDe1iveryCount:轉發消息重試次數,第一次是1,第二次是2,...。發送時提供商沒置
    4:JMSXGroupID:消息所在消息組的標識,由客戶端設置
    5:JMSXGroupSeq::組內消息的序號,第一個小時是1,第二個是2,...。由客戶端設置
    6:JMSXProducerTXID:產生消息的事務的事務標識,發送時提供商設置
    7:JMSXConsumerTXID:消費消息的事務的事務標識,接收時提供商設置
    8:JMSXRcvTimestamp:JMS轉發消息到消費者的時間,接收時提供商設置
    9:JMSXState:假定存在一個消息倉庫,它存儲了每一個消息的的單獨拷貝,且這些消息從原始消息被髮送時開始。每一個拷貝的狀態有:1(等待),2(準備〕,3(到期)或,4(保留)。因爲狀態與生產者和消費者無關,因此它不是由它們來提供。它只和在倉庫中查找消息相關,所以JMS沒有提以這種API。由提供商設置異步

  • 測試案例
    ActiveMQ(三)——理解和掌握JMS
    ActiveMQ(三)——理解和掌握JMS

3、JMS的可靠性機制tcp

  • 消息接收確認
           JMS消息只有在被確認以後,才認爲已經被成功地消費了。消息的成功消費一般包含三個階段:客戶接收消息、客戶處理消息和消息被確認。
           在事務性會話中,當一個事務被提交的時候,確認自動發生。在非事務性會話中,消息什麼時候被確認取決於建立會話時的應答模式(acknowledgementmode)。該參數有如下三個可選值:
           Session.AUTO_ACKNOWLEDGE:當客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的消息。
           Session.CLIENT_ACKNOWLEDGE:客戶經過調用消息的acknowledge方法確認消息。須要注意的是,在這種模式中,確認是在會話層上進行,確認一個被消費的消息將自動確認全部已被會話消費的消息。例如,若是一個消息消費者消費了10個消息,而後確認了第5個消息,那麼全部10個消息都被確認。
           Session.DUPS_ACKNOWLEDGE:該選擇只是會話遲鈍的確認消息的提交。若是JMS provider失敗,那麼能會致使一些重複的消息。若是是重複的消息,那麼JMS provider必須把消息頭的JMSRedelivered字段設置爲true。
    ActiveMQ(三)——理解和掌握JMS
  • 消息持久性,JMS支持如下兩種消息提交模式:
    PERSISTENT:指示JMS provider持久保存消息,以保證的消息不會由於JMS provider的失敗而丟失
    NON_PERSISTENT:不要求JMS provider持久保存消息
  • 消息優先級
    可使用消息優先級別來指示JMS provider首先提交緊急的消息。優先級分10個級別,從0(最低)到9(最高)。若是不指定優先級,默認級別是4。須要注意的是,JMS provider並不必定保證接照優先級的順序提交消息
  • 消息過時
    能夠設置消息在必定時間後過時,默認是永不過時
  • 消息的臨時目的地
    能夠經過會話上的createTemporaryQueue方法和createTemporaryTopic
    力法來建立臨時目的地。它們的存在時間只限於建立它們的鏈接所保持的時間。
    只有建立臨時目的地的鏈接上的消息消費者纔可以從臨時目的地中提取消息。
    ActiveMQ(三)——理解和掌握JMS
  • 持久訂閱
           首先消息生產者必須使用PERSISTENT提交消息。客戶能夠經過會話上的
    createDurab1eSubscriber方法來建立一個持久訂閱,該方法的第一個參數必須
    是一個topic。第二個參數是訂閱的名稱。
           JMS provider會存儲發佈到持久訂閱對應的topic上的消息。若是最初建立
    持久訂閱的客戶或者任何其它客戶,使用相同的鏈接工廠和鏈接的客戶ID,相同的主題和相同的到訂閱名,再次調用會話上的createDurab1eSubscriber方法,那麼持久訂閱就會被激活。JMS provider會向客戶發送客戶處於非激活狀態時所發佈的消息。
           持久訂閱在某個時刻只能有一個激活的訂閱者。持久訂閱在建立以後會一直保留,直到應用程序調用會話上的unsubscribe方法。ide

  • 本地事務
           在一個JMS客戶端,可使用本地事務來組合消息的發送和接收。JMS Session接口提供了commit和rollback方法。事務提交意味着生產的全部消息被髮送,消費的全部消息被確認;事務回滾意味生產的的全部消息被銷燬,消費的全部消息被恢復並從新提交,除非它們已通過期。
           事務性的會話老是牽涉到事務處理中,commit或rollback方法一旦被調用,一個事務就結束了,而另外一個事務被開始。關閉事務性會話將回滾其中的事務。得要注意的是,若是使用請求/回覆機制,即發送一個消息,同時但願在同一個事務中等待接收該消息的回覆,那麼程序將被掛起,由於知道事務提交,發送操做纔會真正執行。
           須要注意的還有一個,消息的生產和消費不能包含在同一個事務中。

4、JMS的PTP模型工具

  • JMS PTP(Point-to-Point)模型定義了客戶端如向隊列發送消息,從隊列接收消息,以及瀏覽隊列中的消息
           PTP模型是基於隊列的,生產者發消息到隊列,消費者從隊列接收消息,隊列的存在使得消息的異步傳輸成爲金可能。和郵件系統中的郵箱同樣,隊列能夠包含各類消息,JMS Provider提供工具管理隊列的建立、刪除。
  • PTP的一些特色
    1:若是在Session關閉時,有一些消息己經被收到,但尚未被簽收(acknowledged),那麼,當消費者下次鏈接到相同的隊列時,這些消息還會被再
    次接收
    2:若是用戶在receive方法中設定了消息選擇條件,那麼不符合條件的消息會留在隊列中,不會被接收到
    3:隊列能夠長久地保存消息直到消費者收到消息。消費者不須要由於擔憂消息會丟失而時刻和隊列保持激活的鏈接狀態,充分體現了異步傳輸模式的優點

5、JMS的Pub/Sub模型測試

  • JMSPub/Sub模型定義瞭如何向一個內容節點發布和訂閱消息,這些節點被稱做topic主題,能夠被認爲是消息的傳輸中介,發佈者(publisher)發佈消息到主題,訂閱者(subscribe〕從主題訂閱消息。主題使得消息訂閱者和消發佈者保持互相獨立,不須要接觸便可保證消息的傳送。
  • JMSPub/Sub的一些特色.
    1:消息訂閱分爲非持久訂閱和持久訂閱
           非持久訂閱只有當客戶端處於激活狀態,也就是和JMS Provider保持鏈接狀態才能收到發送到某個主題的消息,而當客戶端處於離線狀態,這個時間段發到主題的消息將會丟失,永遠不會收到。
           持久訂閱時,客戶端向JMS註冊一個識別本身身份的ID,當這個客戶端處於離線時,JMS provider會爲這個ID保存全部發送到主題的消息,當客戶再次鏈接到JMS provider時,會根據本身的ID獲得全部當本身處於離線時發送到主題的消息。
    2:若是用戶在receive方法中設定了消息選擇條件,那麼不符合條件的消息不會被接收:非持久訂閱狀態下,不能恢復或從新派送一個未簽收的消息。只有持久訂閱才能恢復或從新派送一個未簽收的消息。
    4:當全部的消息必須被接收,則用持久訂閱。當丟失消息可以被容忍,則用非持久訂閱
    6、JMS的API結構(和開發相關)
    ActiveMQ(三)——理解和掌握JMS

7、JMS應用開發的基本步驟
ActiveMQ(三)——理解和掌握JMS線程

8、非持久的Topic消息示例code

  • 對於非持久的Topic消息的發送
    基本跟前面發送隊列信息是同樣的,只是把建立Destination的地方,由創
    建隊列替換成建立Topic,例如:
    Destination destination = session.createTopic(MyTopic");

  • 對於非持久的Topic消息的接收
    1:必需要接收方在線,而後客戶端發送信息,接收方纔能接收到消息
    2:一樣把建立Destination的地方,山建立隊列替換成建立Topic,如:
    Destination destination = session.createTopic('MyTopic);

3:因爲不知道客戶端發送多少信息,所以改爲while循環的方式了,例如:

Message message = consumer.receive();
while(message!=null){
TextMessage txtMsg= (TextMessage)message;
System.out.println(「收到消息:」+txtMsg.getText());
message = consumer.receive(1000L);
}
  • 完整示例

    public class NoPersistenceSender {
    public static void main(String[] args) throws Exception{
        //鏈接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
    
        //帶事務的session
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("myTopic");
        MessageProducer producer = session.createProducer(destination);
    
        for (int i = 0; i < 3; i++) {
            TextMessage message = session.createTextMessage("message---"+i);
            producer.send(message);
        }
        session.commit();
        session.close();
        connection.close();
    }
    }
public class NoPersistenceReceive {
    public static void main(String[] args) throws Exception{
        //鏈接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //帶事務的session
        final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createTopic("myTopic");
        MessageConsumer consumer = session.createConsumer(destination);

        Message message = consumer.receive();
        while(message!=null){
            TextMessage txtMsg= (TextMessage)message;
            System.out.println("收到消息:"+txtMsg.getText());
            message = consumer.receive(1000L);
        }
        session.commit();
        session.close();
        connection.close();
    }
}

注意:對於非持久性的Topic消息,則須要接收者保持運行狀態,否則消息發送者發出的消息會接收不到。
ActiveMQ(三)——理解和掌握JMS

9、持久的Topic消息示例
1:須要在鏈接上設置消費者ID,用來識別消費者
2:須要建立TopicSubscriber來訂閱
3:要設置好了事後再start這個connnection
4:消費者必定要先運行一次,等於向消息服務中間件註冊這個消費者,而後再運行客戶端發送消息,這個時候不管消費者是否在線,都會接收到,下次鏈接的時候,會把沒有收過的消息都接收下來。

public class PersistenceSender {
    public static void main(String[] args) throws Exception{
        //鏈接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();

        //帶事務的session
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("myTopic");
        MessageProducer producer = session.createProducer(destination);

        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();

        for (int i = 0; i < 3; i++) {
            TextMessage message = session.createTextMessage("message---"+i);
            producer.send(message);
        }
        session.commit();
        session.close();
        connection.close();
    }
}
public class PersistenceReceive {
    public static void main(String[] args) throws Exception{
        //鏈接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.setClientID("cc1");

        //帶事務的session
        final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

        Topic destination = session.createTopic("myTopic");
        TopicSubscriber ts = session.createDurableSubscriber(destination,"T1");
        connection.start();

        Message message = ts.receive();
        while(message!=null){
            TextMessage txtMsg= (TextMessage)message;
            System.out.println("收到消息:"+txtMsg.getText());
            message = ts.receive(1000L);
        }
        session.commit();
        session.close();
        connection.close();
    }
}

ActiveMQ(三)——理解和掌握JMS

10、總結

  • 持久化消息
    這是ActiveMQ的默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對於這些消息,可靠性是優先考慮的因素。可靠性的另外一個重要方面是確保持久性消息傳送至目標後,消息服務在向消費者傳送它們以前不會丟失這些消息。
    這意味着在持久性消息傳送至目標時,消息服務將其放入持久性數據存儲。若是消息服務因爲某種緣由致使失敗,它能夠恢復此消息並將此消息傳送至相應的消費皆。雖然這樣增長了消息傳送的開銷,但卻增長了可靠性。
  • 非持久化消息保證這些消息最多被傳送一次。對於這些消息,可靠性並不是主要的考慮因素。此模式並不要求持久性的數據存儲,不保證消息服務因爲某種緣由致使失敗後消息不會丟失。有兩種方法指定傳送模式:1.使用setDe1iveryMode方法,這樣全部的消息都採用此傳送模式;如producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)2.使用send方法爲每一條消息設置傳送模式
相關文章
相關標籤/搜索