消息中間件,Message-Oriented Middleware,簡稱MOM。
採用消息中間件的做用通常有兩點:一是解耦,二是異步(起到削峯填谷的做用)java
點對點模型(基於隊列的模式,若是有多個消費者,那麼這些消費者輪流消費消息,達到負載均衡)編程
發佈訂閱模型
基於topic的發佈/訂閱模型較爲流行,一般以此模型爲主,外加點對點模型實現生產者/消費者的負載均衡。api
推技術:消息服務器主動推送消息到消息消費者(JMS的Pub/Sub採用的是推技術)服務器
拉技術:消息消費者定時去消息服務器取消息(JMS接收者可以推送或拉取消息,取決於它是否使用異步onMessage回調或者是同步receive方法)網絡
同步receive方式就是拉的方式,消費者主動去消息服務器取消息,異步的listener方式爲推的方式。負載均衡
同步receive的實例:異步
// Create the sender and send the message QueueSender qSender = qSession.createSender(requestQ); qSender.send(msg); // Wait to see if the loan request was accepted or declined String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'"; QueueReceiver qReceiver = qSession.createReceiver(responseQ, filter); TextMessage tmsg = (TextMessage)qReceiver.receive(30000); if (tmsg == null) { System.out.println("Lender not responding"); } else { System.out.println("Loan request was " + tmsg.getText()); }
基本的概念:消息生產者、消息消費者、消息、目的地/topic/路由ide
一條消息可分爲 3 部分:消息頭(Head)、屬性(Attribute)和有效負載(Payload,即消息體)。ui
這部分一般是結構化的數據,提供了和消息有關的元數據。spa
消息惟一標識
目的地
傳送方式(持久/非持久)
消息接收時間
消息失效時間
消息優先級
是否重發等
用於JMS的高級功能特性,以及存儲一些應用特有的屬性或者JMS廠商提供的額外功能屬性。
A、高級功能:groupId/groupSeq,消息的分組聚合
B、應用本身添加的屬性:好比username等
C、特殊廠商的額外功能屬性
通常能夠有結構化的和非結構化的,JMS裏頭定義的比較詳細,有Text、Object、Bytes、Stream、Map等格式。可是通常大多byte[]格式,本身能夠選擇自定義的序列化方式。
在目的地上使用消息選擇器,利用消息屬性和消息頭(沒法使用消息體內的數據)做爲條件表達式的準則,消息在目的地(隊列/主題)分發給消費者以前就過濾好。(rabbitmq有基於routing key的表達式過濾方式,來選擇接收哪幾個topic的消息)
消息在目的地分發給消費者以前過濾。
使用MessageFitler的優勢是,具備更強的可伸縮性,假設增長了一個CustType爲PLATINUM級別,那麼只要增長一個相應的消息消費者就能夠。
消息在發送給目的以前過濾,分發給不一樣目的地不一樣的消息
若是是使用Multiple Destination方式,就是增長一個隊列來保存PLATINUM,同時還得增長一個類來監聽這個新的隊列。
MultipleDestination的好處是生產者控制消息過濾分類,不容易出錯,可是可擴展性稍差,MessageFilter是可擴展性好,可是容易出錯。
這些消息不會傳送給該訂閱者,不管是持久訂閱仍是非持久訂閱
未被消費者選擇的全部消息,對該消費者都是不可見的。
確認由發送者或發佈者生產的全部消息各自都有和它們相關聯的有效期,默認狀況是永不過時,那麼意味着若是一條消息被過濾掉,沒有傳送給消費者,它將在隊列中永久駐留,能夠經過設置生存時間來解決。
有些廠商提出了死信隊列(Dead Letter Queue,DLQ),或停用消息隊列(Dead Message Queue,DMQ)的概念,來處理那些被認爲沒法傳送的消息。
最簡單的狀況,消息傳送系統將全部沒法傳送的消息放入DMQ,而應用程序負責監測它的內容。也能夠支持管理型事件,可以在消息放入DMQ時通知應用程序。
對於rabbitmq,有一個dead letter的機制(當消息在一個隊列中變成死信後,它能被從新publish到另外一個Exchange,這個Exchange就是DLX)。
保證傳送機制(guaranteed delivery),確保即使發生了具有故障,預約消費者最終也會接收到這條消息。
當消費者出故障時,將消息保存到持久化介質中,等待消費者恢復以後,從持久介質取出消息,轉發給消費者
public interface DeliveryMode { /** This is the lowest-overhead delivery mode because it does not require * that the message be logged to stable storage. The level of JMS provider * failure that causes a <CODE>NON_PERSISTENT</CODE> message to be lost is * not defined. * * <P>A JMS provider must deliver a <CODE>NON_PERSISTENT</CODE> message * with an * at-most-once guarantee. This means that it may lose the message, but it * must not deliver it twice. */ static final int NON_PERSISTENT = 1; /** This delivery mode instructs the JMS provider to log the message to stable * storage as part of the client's send operation. Only a hard media * failure should cause a <CODE>PERSISTENT</CODE> message to be lost. */ static final int PERSISTENT = 2; }
Message默認是PERSISTENT模式。
這個地方是JMS裏頭容易讓人混亂的地方,其本質上,仍是利用消息頭的deliveryMode屬性來標記的。
能夠有兩個地方能夠設置:
是發送消息的時候,設置是否持久化;
msg.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
默認就是持久化模式的。
是在接收消息以前設置傳送模式。好比對於topic提供durableSubscriber的方法。
public interface TopicSession extends javax.jms.Session { javax.jms.Topic createTopic(java.lang.String s) throws javax.jms.JMSException; javax.jms.TopicSubscriber createSubscriber(javax.jms.Topic topic) throws javax.jms.JMSException; javax.jms.TopicSubscriber createSubscriber(javax.jms.Topic topic, java.lang.String s, boolean b) throws javax.jms.JMSException; javax.jms.TopicSubscriber createDurableSubscriber(javax.jms.Topic topic, java.lang.String s) throws javax.jms.JMSException; javax.jms.TopicSubscriber createDurableSubscriber(javax.jms.Topic topic, java.lang.String s, java.lang.String s1, boolean b) throws javax.jms.JMSException; javax.jms.TopicPublisher createPublisher(javax.jms.Topic topic) throws javax.jms.JMSException; javax.jms.TemporaryTopic createTemporaryTopic() throws javax.jms.JMSException; void unsubscribe(java.lang.String s) throws javax.jms.JMSException; }
當客戶端和服務器之間的網絡鏈接丟失時,JMS提供者必須儘量從新創建鏈接。若是該JMS提供者沒法從新自動鏈接,在客戶端調用某種可以引發網絡流量的方法時,提供者必須拋出一個異常,向客戶端通知這個狀況。JMS提供了ExceptionListener接口,用於捕獲丟失的鏈接(能夠在捕獲時重連),並向客戶端通知這個狀況。與MessageListener不一樣,MessageListener是與會話綁定在一塊兒的。
通常是自動肯定、手動肯定。
TopicPublisher.publish和QueueSender.send方法是同步的,這些方法負責發送消息,同時進行阻塞,直到從消息服務器接收到一個確認爲止。一旦接收到一個確認,執行線程就會恢復並返回方法,認爲消息發送成功。底層確認對客戶端編程模型來講是不可見的。若是在這個操做期間發生了一個故障狀況,就會拋出一個異常,同時認爲該消息未被傳送(注意從新發送指的是消費服務器到消費者的從新發送)。
若是會話是AUTO_ACKNOWLEDGE模式,當每一個消費者得到消息時,JMS提供者的客戶端運行時環境必須自動向服務器發送確認信息。若是服務器沒有接收到這個確認信息,它就會認爲該消息未被傳送,並可能會試圖從新傳送。
確認消息從服務器發送到生產者,意味着服務器已經接收到該消息,並已經承擔了傳送它的責任。從JMS服務器的角度來看,發送到生產者的確認並未和消息傳送直接關聯。邏輯上,它們是兩個獨立的步驟。
A、對於持久消息來講,服務器將消息寫入磁盤,而後在再通知生產者該消息已經被接收。
B、對於非持久消息,意味着服務器可以在接收到消息後馬上通知發送者,並將該消息存入內存。若是該消息的主題沒有訂閱者,根據廠商的不一樣,也可能會將該消息拋棄。
A、對於持久訂閱者來講
一直到消息服務器接收到全部的消息預約接收者的確認時,消息服務器纔會認爲該消息已經完成傳送。要得到這些信息,必須對每一個消費者都很是瞭解:哪個客戶端已經接收到每條消息,哪個尚未接收到。一旦消息服務器將消息傳送給全部的已知訂閱者,並已分別從訂閱者那裏接收到確認,就會將這條消息從持久存儲器中刪除。若是持久訂閱,而訂閱者當前並未鏈接,那麼消息服務器會將該消息保存起來,直到該訂閱者變成可用狀態或消息到期爲止。甚至對於非持久消息來講,也是如此。也就是說,若是消息消費者設定爲持久訂閱,則無論消息生產者設定消息是不是持久的,當消費者不在時,老是保存,恢復時轉發。
B、對於非持久性消息來講
在消息服務器已經向發送者確認消息後,以及消息服務器有機會表明未鏈接的持久訂閱者將消息寫入磁盤以前,兩者之間可能會有一個時間窗,若是JMS在這個時間窗內出現故障,該消息就可能丟失。
C、若是是使用持久消息時
一個提供者可能會出現故障,可是會優雅恢復正常。因爲消息保存在持久存儲器中,它們並無丟失,在提供者再次啓動時,它們又會傳送給消費者。若是是點對點隊列,它們可以保證被傳送出去,若是是發佈訂閱發送,只有消費者的訂閱爲持久性時,才能被保證傳送出去,非持久訂閱的傳送行爲因提供者不一樣而不一樣。
生產者在commit以前,JMS提供者不會開始向它的消費者傳送消息,即便它已經從發送者那裏接收到全部的消息。
發送消息時,若是在發送消息的方法正常完成後沒有調用commit方法,JMS提供者會從隊列中刪除這些消息,而這些消息並無傳送給消費者。
消息會盡量快地傳送給接收者,可是它們一直由JMS提供者保存,直到接收者在會話對象上發佈commit爲止,若是發生了故障或調用了rollback,提供會試圖從新傳送這些消息,這種狀況下,這些消息會設置爲從新傳送標記。
接收消息時,若是在接收消息方法正常完成後沒有調用commit方法,消息就會被標記爲未被傳送,JMS提供者會將這些消息從新傳送給消費者,並將JMSRedelivered標記爲true,表示此前曾試圖處理過這些消息。