activeMq-JMS消息可靠性機制-4

 

消息接收確認

  JMS消息只有在被確認以後,才認爲已經被成功地消費了。
  消息的成功消費一般包含三個階段:客戶接收消息、客戶處理消息和消息被確認。session

  //參數1:是否啓用事務(false表示不開啓事務)   參數2:接收模式(通常設置爲自動接收)
  Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

  在事務性會話中,當一個事務被提交的時候(session.commit() ),確認自動發生。異步

  在非事務性會話中,消息什麼時候被確認取決於建立會話時的應答模式(acknowledgement mode)。
    該參數有如下四個可選值:
      Session.AUTO_ACKNOWLEDGE:
            當客戶成功的從receive方法返回的時候,或者從
            MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的消息。
            // 測試時接收端一直掛起沒有結束,緣由是listener新起了一個線程一直監聽,若是再次運行接收端
            // 新開一個consumer是接收不到消息的,消息已經被第一個消費了tcp

      Session.CLIENT_ACKNOWLEDGE:
            客戶經過調用消息的acknowledge方法確認消
            息。須要注意的是,在這種模式中,確認是在會話層上進行,確認一個被消費的消息
            將自動確認全部已被會話消費的消息。例如,若是一個消息消費者消費了10 個消
            息,而後確認第5 個消息,那麼全部10 個消息都被確認。            ide

          開發者須要須要關注幾個方法:
            1) message.acknowledge(),
            2) ActiveMQMessageConsumer.acknowledege(),
            3) ActiveMQSession.acknowledge();
          其1)和3)是等效的,將當前session中全部consumer中還沒有ACK的消息都一塊兒確認,
          2)只會對當前consumer中那些還沒有確認的消息進行確認。
          一般會在基於Group(消息分組)狀況下會使用CLIENT_ACKNOWLEDGE,
          咱們將在一個group的消息序列接受完畢以後確認消息(組);不過當你認爲消息很重要,
          只有當消息被正確處理以後才能確認時,也可使用此模式 。         工具

          Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
          Queue queue = session.createQueue("queue");
          MessageConsumer consumer = session.createConsumer(queue);
          int i = 0;
          while(i < 3){
           TextMessage msg = (TextMessage)consumer.receive();
          if(i == 2){
               // 若是不確認簽收,消息一直存在,當再次啓動客戶端會再次接收到消息

           msg.acknowledge();// 確認簽收
           }
          i++;
          }

      Session.SESSION_TRANSACTED:
            用session.commit()進行簽收 ,要麼所有正常確認,要麼所有redelivery。
            這種嚴謹性,一般在基於GROUP(消息分組)或者其餘場景下特別適合。測試

      Session.DUPS_ACKNOWLEDGE:
            該選擇只是會話遲鈍的確認消息的提交。若是JMS
            provider失敗,那麼可能會致使一些重複的消息。若是是重複的消息,那麼JMS
            provider 必須把消息頭的JMSRedelivered字段設置爲truespa

 

 消息持久性     

JMS 支持如下兩種消息提交模式:
  
PERSISTENT:只是JMS provider持久保存消息,以保證消息不會由於JMS provider的失敗而丟失
  NON_PERSISTENT:不要求JMS provider持久保存消息線程

  // producer.setDeliveryMode(DeliveryMode.PERSISTENT);將消息傳遞特性置爲持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 非持久化

 消息優先級

  可使用消息優先級來指示JMS provider首先提交緊急的消息。優先級分
  10個級別,從0(最低)到9(最高)。若是不指定優先級,默認級別是4。須要
  注意的是,JMS provider並不必定保證按照優先級的順序提交消息3d

  producer.setPriority(int i)code

消息過時

  能夠設置消息在必定時間後過時,默認是永不過時

  producer.setTimeToLive(Long aliveTime);

  注意timeToLive屬性只會在DisableMessageTimestamp=false(禁用消息時間戳)的狀況下才有意義。

 

消息的臨時目的地

  能夠經過會話上的createTemporaryQueue 方法和createTemporaryTopic
  方法來建立臨時目的地。它們的存在時間只限於建立它們的鏈接所保持的時間。
  只有建立該臨時目的地的鏈接上的消息消費者纔可以從臨時目的地中提取消息

 

持久訂閱

    首先消息生產者必須使用PERSISTENT提交消息。客戶能夠經過會話上的
  createDurableSubscriber方法來建立一個持久訂閱,該方法的第一個參數必須
  是一個topic。第二個參數是訂閱的名稱。
    JMS provider會存儲發佈到持久訂閱對應的topic上的消息。若是最初建立
  持久訂閱的客戶或者任何其它客戶,使用相同的鏈接工廠和鏈接的客戶ID,相同
  的主題和相同的訂閱名,再次調用會話上的createDurableSubscriber方法,那
  麼該持久訂閱就會被激活。JMS provider會向客戶發送客戶處於非激活狀態時所
  發佈的消息。
    持久訂閱在某個時刻只能有一個激活的訂閱者。持久訂閱在建立以後會一
  直保留,直到應用程序調用會話上的unsubscribe方法。

本地事務

    在一個JMS客戶端,可使用本地事務來組合消息的發送和接收。JMS
  Session接口提供了commit和rollback方法。事務提交意味着生產的全部消息被
  發送,消費的全部消息被確認;事務回滾意味着生產的全部消息被銷燬,消費的
  全部消息被恢復並從新提交,除非它們已通過期。
    事務性的會話老是牽涉到事務處理中,commit或rollback方法一旦被調
  用,一個事務就結束了,而另外一個事務被開始。關閉事務性會話將回滾其中的事務。
    須要注意的是,若是使用請求/回覆機制,即發送一個消息,同時但願在同
  一個事務中等待接收該消息的回覆,那麼程序將被掛起,由於知道事務提交,發
  送操做纔會真正執行。
    須要注意的還有一個,消息的生產和消費不能包含在同一個事務中

JMS的PTP模型

    JMS PTP(Point-to-Point)模型定義了客戶端如何向隊列發送消息,從隊列接收
  消息,以及瀏覽隊列中的消息。
    PTP模型是基於隊列的,生產者發消息到隊列,消費者從隊列接收消息,隊
  列的存在使得消息的異步傳輸成爲可能。和郵件系統中的郵箱同樣,隊列能夠包
  含各類消息,JMS Provider 提供工具管理隊列的建立、刪除。
  PTP的一些特色
    1:若是在Session 關閉時,有一些消息已經被收到,但尚未被簽收
      (acknowledged),那麼,當消費者下次鏈接到相同的隊列時,這些消息還會被再
      次接收
    2:若是用戶在receive 方法中設定了消息選擇條件,那麼不符合條件的消息會留在
      隊列中,不會被接收到
    3:隊列能夠長久地保存消息直到消費者收到消息。消費者不須要由於擔憂消息會丟
      失而時刻和隊列保持激活的鏈接狀態,充分體現了異步傳輸模式的優點

 JMS的Pub/Sub模型

    JMS Pub/Sub 模型定義瞭如何向一個內容節點發布和訂閱消息,這些節點被稱做topic
  主題能夠被認爲是消息的傳輸中介,發佈者(publisher)發佈消息到主題,訂閱者
  (subscribe) 從主題訂閱消息。主題使得消息訂閱者和消息發佈者保持互相獨立,不須要
  接觸便可保證消息的傳送。
  Pub/Sub的一些特色:
    1:消息訂閱分爲非持久訂閱和持久訂閱
      非持久訂閱只有當客戶端處於激活狀態,也就是和JMS Provider保持鏈接狀態才能
        收到發送到某個主題的消息,而當客戶端處於離線狀態,這個時間段發到主題的消息將會
        丟失,永遠不會收到。
      持久訂閱時,客戶端向JMS 註冊一個識別本身身份的ID,當這個客戶端處於離線
        時,JMS Provider會爲這個ID 保存全部發送到主題的消息,當客戶再次鏈接到JMS
        Provider時,會根據本身的ID 獲得全部當本身處於離線時發送到主題的消息。
    2:若是用戶在receive 方法中設定了消息選擇條件,那麼不符合條件的消息不會被接收
    3:非持久訂閱狀態下,不能恢復或從新派送一個未簽收的消息。只有持久訂閱才能恢復或重
      新派送一個未簽收的消息。
    4:當全部的消息必須被接收,則用持久訂閱。當丟失消息可以被容忍,則用非持久訂閱

 非持久的Topic消息示例

/*對於非持久的Topic消息的發送
基本跟前面發送隊列信息是同樣的,只是把建立Destination的地方,由創
建隊列替換成建立Topic*/
Destination destination = session.createTopic("MyTopic");
/*對於非持久的Topic消息的接收
1:必需要接收方在線,而後客戶端再發送信息,接收方纔能接收到消息
2:一樣把建立Destination的地方,由建立隊列替換成建立Topic*/
Destination destination = session.createTopic("MyTopic");
/*3:因爲不知道客戶端發送多少信息,所以改爲while循環的方式了*/
Message message = consumer.receive();
    while(message!=null) {
    TextMessage txtMsg = (TextMessage)message;
    System.out.println("收到消 息:" + txtMsg.getText());
    message = consumer.receive(1000L);
}
View Code

 

 持久的Topic消息示例

消息的發送

ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://192.168.1.106:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("MyTopic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for(int i=0; i<2; i++) {
    TextMessage message = session.createTextMessage("messagedd--"+i);
    Thread.sleep(1000);
    //經過消息生產者發出消息
    producer.send(message);
}
session.commit();
session.close();
connection.close();
1:要用持久化訂閱,發送消息者要用 DeliveryMode.PERSISTENT 模式發現,在鏈接以前設定
2:必定要設置完成後,再start 這個 connection
View Code

 

消息的接收

ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.1.106:61616");
Connection connection = cf.createConnection();
connection.setClientID("cc1");
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("MyTopic");
TopicSubscriber ts = session.createDurableSubscriber(destination, "T1");
connection.start();
Message message = ts.receive();
while(message!=null) {
    TextMessage txtMsg = (TextMessage)message;
    session.commit();
    System.out.println("收到消 息:" + txtMsg.getText());
    message = ts.receive(1000L);
}
session.close();
connection.close();
1:須要在鏈接上設置消費者id,用來識別消費者
2:須要建立TopicSubscriber來訂閱
3:要設置好了事後再start 這個 connection
4:必定要先運行一次,等於向消息服務中間件註冊這個消費者,而後再運行客戶端發送信息,這個時候,
不管消費者是否在線,都會接收到,不在線的話,下次鏈接的時候,會把沒有收過的消息都接收下來。
View Code

 

 

 

參考:

  Producer特性詳解:http://shift-alt-ctrl.iteye.com/blog/2034440

相關文章
相關標籤/搜索