![]() |
![]() |
JMSAPI能夠分爲3個主要部分:公共API、隊列API和主題API。spring
JMSAPI中,ConnectionFactory和Destination既能夠做爲受管對象,由JMS提供者建立,並使用JNDI從提供者得到,也能夠直接動態建立;服務器
其餘接口經過工廠方法建立,好比Session能夠經過Connection建立;網絡
消息生產者和消費者通常僅建立一個鏈接(Connection),而能夠建立多個會話(Session);session
由Session保存用於消息發送的事務性工做單元。異步
經過Session能夠建立Message,MessageProducer,MessageConsumer。ide
![]() |
消息可分爲3部分:消息頭、屬性和有效負載。spa
其中消息又分爲自動分配消息頭和開發者分配消息頭。線程
自動分配的消息頭:3d
1)JMSDestination,消息目的地。對象
2)JMSDeliveryMode,消息傳送模式,默認設置爲持久性模式。
3)JMSMessageID,消息ID。
4)JMSTimestamp,JMS提供者接收消息的時間。
5)JMSExpiration,消息過時時間,默認值爲0,即永不過時。
6)JMSRedelivered,是不是重發的消息。
7)JMSPriority,0~4級是普通優先級,而5~9級則是加急優先級。
開發者分配的消息頭:
1)JMSReplyTo,接收消息後發送響應消息的目的地。
2)JMSCorrelationID,與消息相關聯的特定ID,在大多數狀況下,用於接收消息後發送響應消息時,記錄上一個消息的ID。
3)JMSType,消息類型。
屬性:
與消息頭相似,由開發者添加,支持String、Int、Boolean、Double、Float、Byte、Long、Short、Object等類型,可用於消息選擇;
有效負載,即實際傳輸的消息內容。
根據要攜帶的有效負載類型,JMS定義了6種消息接口:
Message,不含有效負載;
TextMessage,攜帶一個String做爲有效負載;
ObjectMessage,攜帶一個可序列化Java對象做爲有效負載;
BytesMessage,攜帶一組原始類型字節流;
StreamMessage,攜帶一個Java原始數據類型流;
MapMessage,攜帶一組鍵值對做爲有效負載。
消息持久化,經過JMSDeliveryMode設定。在持久性模式下,消息在服務器端被持久化保存,直至消費者接收到該消息,所以能夠保證消息成功發送有且僅有一次,而非持久性模式只能保證消息最多發送一次,而不能保證消息被成功接收。
![]() |
對於消息生產者相關API的使用,咱們分隊列和主題分別介紹。
在隊列下:
經過ActiveMQ提供的鏈接工廠類建立queueConnection。
queueConnection調用start方法真正創建與消息服務器的長鏈接。
調用queueConnectoin的createQueueSession方法建立負責消息相關操做的會話單元。
調用queueSession的createQueue、createSender、createMessage方法分別建立隊列、發送者和消息。
最後調用queueSender的send方法發送消息,消息發送是一個同步操做,在send方法中線程會被阻塞,直到接收到來自消息服務器的確認。
在主題下,消息生產過程當中各接口的調用方法與隊列下的基本相似,只是使用了與主題相關的一些接口。所以,對於消息生產,可使用公共API中的接口來完成。
![]() |
使用公共API中的接口實現消息生產者,其中使用了公共API中的接口Connection、Session、Destination和MessageProducer。
在建立destination時,調用createTopic方法和createQueue方法來分別建立主題和隊列。
![]() |
對於消息消費者相關API的使用,咱們也分隊列和主題分別介紹。
在隊列下,調用queueSession的createReceiver方法建立接收者。
調用queueReceiver的receive方法接收消息,在receive方法中,線程將被阻塞直至接收到消息。也能夠在調用receive方法時,設置最長延時,若該時間內未收到消息,receive方法將直接返回。
在主題下,調用topicSession的createSubscriber方法建立訂閱者。
另外須要經過實現MessageListener接口,實現消息偵聽器,並重寫偵聽器的onMessage方法。經過topicSubscriber的setMessageListener將偵聽器註冊到訂閱者中。當消息服務器將一條消息推向topicSubscriber時,topicSubscriber將調用偵聽器的onMessage方法對消息進行具體處理。
![]() |
使用公共API中的接口實現消息消費者。
對於隊列和主題,除了在建立destination時的方法不一樣外,在消息接收時,隊列下的consumer是經過receive方法阻塞線程主動接收消息,主題下的comsumer是經過設置偵聽器偵聽推送過來的消息。
![]() |
對於持久訂閱者,它與普通訂閱者的區別是:對於普通訂閱者,在不鏈接時,發送的消息將被丟失,鏈接後這些消息不會被接收,而對於持久訂閱者,在不鏈接時,發送的消息將被保存,鏈接後這些消息會被正常接收。
持久訂閱者的建立與普通訂閱者的建立有兩點不一樣:一是在鏈接中須要設置ClientID,二是在建立訂閱者時調用createDurableSubscriber方法建立持久訂閱者,並設置訂閱者名稱。消息服務器將ClientID和訂閱者名稱做爲持久訂閱者的惟一標識符,在不鏈接時,爲其保留一份消息副本,在鏈接時,將消息發送至該持久訂閱者。
若要完全關閉持久訂閱者,使消息服務器再也不爲其保留消息副本,則須要調用unsubscribe方法。
持久訂閱者的優勢是無論訂閱者是否鏈接,消息都能被正常接收。
持久訂閱者的缺點是若訂閱者一直不鏈接,消息將被一直保存,形成存儲壓力。
所以是否選用持久訂閱者要根據具體應用場景來決定,若須要保證訂閱者在有鏈接丟失的狀況下仍能接收到全部消息,則採用持久訂閱者,若只須要訂閱者在鏈接時接收消息,且容許有必定的消息丟失,則採用普通訂閱者。
![]() |
隊列中的消息只能被一個消費者消費,若查看隊列中的消息,但不消費消息,可使用QueueBrowser。
經過調用getEnumeration方法能夠按照消息接收順序獲取到當前隊列中的全部消息。
![]() |
對於消費者,有時候須要處理知足特定條件的消息,一個可行的解決方案是在處理消息前,增長條件判斷,對於不知足條件的消息不做處理,但這個方案存在的一個問題是因爲消息已經被該消費者接收,但不會被處理,從而形成網絡帶寬的浪費,另外不做處理的消息也不能再被其餘可能會處理這些消息的消費者接收。
咱們能夠經過消息選擇器解決上述問題。使用消息選擇器能夠實現消息的過濾,只接收知足特定要求的消息。
消息選擇器能夠增長的條件包括消息頭和消息屬性。消息頭中能夠用做選擇條件的有JMSDeliveryMode,JMSPriority,JMSMessageID,JMSTimestamp,JMSCorrelationID,JMSType,例如能夠選擇權重值較高的消息。在消息屬性中能夠設置業務屬性,這樣就能夠選擇知足特定業務需求的消息。
消息選擇器的基本表達式由標識符、比較運算符和常量組成,例如,JMSPriority> 5,即選擇權重大於5的消息。
多個基本表達式能夠由AND、OR組成複雜表達式,例如,JMSPriority> 5 AND Oper= ‘Delete’,即選擇權重大於5且業務上操做類型爲「Delete」的消息。
消息選擇器在建立消費者時設置,以下所示。
![]() |
對於持久化消息,消息服務器會對消息進行確認,以保證消息成功發送。
消息確認機制有以下幾種:AUTO_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE,CLIENT_ACKNOWLEDGE,
消息確認機制能夠在建立session時指定。
AUTO_ACKNOWLEDGE是由消息服務器自動完成消息確認。
消息的發送和接收是兩個異步過程,所以分別討論這兩個過程當中AUTO機制的消息確認。
消息發送時:
1)生產者發送消息,生產者阻塞線程等待消息服務器接收到消息的確認。
2)消息服務器接收到消息後,對消息進行持久化。
3)持久化成功後,消息服務器向生成者發送確認通知。
4)生產者接收到確認通知後,從發送方法返回。
消息接收時:
5)消費者接收消息。
6)消費者發送確認通知。
7)消息服務器從存儲中刪除消息。
DUPS_OK_ACKNOWLEDGE不一樣於AUTO對單條消息進行確認,是一種延時、批量確認機制,這樣作的一個好處是能夠減小因單條消息確認而帶來的系統開銷,但帶來的一個問題是可能將一條消息向同一目的地發送兩次以上,所以,適用於能夠重複接收消息的應用場景。
CLIENT_ACKNOWLEDGE是由消費者經過調用消息的acknowledge方法進行確認,若是消費者使用CLIENT機制,且調用acknowledge方法,那麼在上次確認之間所發的未被確認的消息都將被確認。
INDIVIDUAL_ACKNOWLEDGE也是由消費者經過調用消息的acknowledge方法進行確認,但不一樣於CLIENT機制,每條消息都須要調用acknowledge方法進行確認。
![]() |
消息事務能夠保證:
若是在一個會話中,只有消息生產者或只有消息消費者,則對於消息生產者,事務將保證其全部消息都發送至服務器或都不發送,而對於消息消費者,事務將保證其接收全部消息或都不接收。
若是在一個會話中,既有消息生產者,又有消息消費者,則事務將保證全部消息都發送並接收或都不發送和接收。
建立會話時能夠設置是否使用事務。
屢次操做後經過調用commit()提交。在出現異常時,經過調用rollback()回滾。
![]() |
Spring對JMS提供支持,在客戶端能夠經過Spring配置鏈接、生產者和消費者。
經過spring提供的DefaultMessageListenerContainer能夠設置消息的偵聽,經過spring提供的JmsTemplate能夠進行消息的發送和接收。