Spring框架之jms源碼徹底解析html
咱們在前兩篇文章中介紹了Spring兩大核心IOC(Inversion of Control控制反轉)和AOP(Aspect Oriented Programming面向切面編程)技術:Spring框架之beans源碼徹底解析和Spring框架之AOP源碼徹底解析,下面對Spring的jms源碼進行分析,先對jms進行簡單的介紹,其次對Spring中jms模塊源碼文件清單進行梳理,而後對jms的單獨使用和Spring整合jms使用進行演示,最後對Spring中jms模塊有兩個核心JmsTemplate和消息監聽器源碼進行分析。java
1、jms簡介react
分佈式系統消息通訊技術主要包括:(1) RPC(Remote Procedure Call Protocol),通常是C/S方式,同步的,跨語言跨平臺,面向過程。(2)CORBA(Common Object Request Broker Architecture),CORBA從概念上擴展了RPC,面向對象的,企業級的(面向對象中間件還有DCOM)。(3) RMI(Remote Method Invocation),面向對象方式的 Java RPC。(4)WebService基於Web,C/S或B/S,跨系統跨平臺跨網絡。多爲同步調用,實時性要求較高。(5)MOM(Message oriented Middleware) 面向消息中間件。spring
面向消息中間件,主要適用於消息通道、消息總線、消息路由和發佈/訂閱的場景。目前主流標準有JMS(Java Message Service)、AMQP(Advanced Message Queuing Protocol)和STOMP(Streaming Text Oriented Messaging Protocol)。AMQP是一個面向協議的,跟語言平臺無關的消息傳遞應用層協議規範。STOMP是流文本定向消息協議,是一種爲MOM設計的簡單文本協議。AMQP和STOMP都是跟http處於同一層的協議。JMS是Java平臺上的面向接口的消息規範,是一套API標準,並無考慮異構系統。數據庫
JMS即Java消息服務應用程序接口,是一個Java平臺關於面向消息中間件的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數面向消息中間件提供商都對JMS提供支持,JMS相似於JDBC(Java Database Connectivity),JDBC 是能夠用來訪問許多不一樣關係數據庫的API,而JMS則提供一樣與廠商無關的訪問方法,用來訪問消息收發服務。因此兩個應用程序之間要進行通訊,咱們使用了一個JMS服務,進行中間的轉發,經過使用JMS,能夠解除兩個程序之間的耦合,提升消息靈活性,支持異步性。apache
JMS編程模型包含的幾個要素:編程
(1)鏈接工廠。 鏈接工廠(ConnectionFactory)是由管理員建立,並綁定到JNDI(Java命名和目錄接口)樹中。針對兩種不一樣的jms消息模型(點對點和發佈/訂閱),分別有QueueConnectionFactory和TopicConnectionFactory兩種。客戶端使用JNDI查找鏈接工廠,而後利用鏈接工廠建立一個JMS鏈接。設計模式
(2)JMS鏈接。JMS鏈接(Connection)表示JMS客戶端和服務器端之間的一個活動的鏈接,是由客戶端經過調用鏈接工廠的方法創建的。Connection表示在客戶端和JMS系統之間創建的連接(對TCP/IP socket的包裝)。Connection能夠產生一個或多個Session。跟ConnectionFactory同樣,Connection也有兩種類型:QueueConnection和TopicConnection。數組
(3)JMS會話。JMS會話(Session)表示JMS客戶與JMS服務器之間的會話狀態。JMS會話創建在JMS鏈接上,表示客戶與服務器之間的一個會話線程。Session是咱們操做消息的接口。能夠經過session建立生產者、消費者、消息等。Session提供了事務的功能。當咱們須要使用session發送/接收多個消息時,能夠將這些發送/接收動做放到一個事務中。一樣,也分QueueSession和TopicSession。緩存
通俗的講,Connection(鏈接)是一個物理概念,是指一個經過網絡創建的客戶端和專有服務器或調度器之間的一個網絡鏈接。Session(會話)是一個邏輯概念,它存在於實例中,一個Connection能夠擁有多個Session,也能夠沒有Session,同一個Connection上的多個Session之間不會相互影響。connection至關於修路,而session至關於經過這條道路的一次運輸。
(4)JMS目的。JMS目的(Destination),又稱爲消息隊列,是實際的消息源。Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來講,它的Destination是某個隊列(Queue)或某個主題(Topic);對於消息消費者來講,它的Destination也是某個隊列或主題(即消息來源)。因此Destination實際上就是兩種類型的對象:Queue、Topic。
(5)JMS消息。消息由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶着應用程序的數據或有效負載。根據有效負載的類型來劃分,能夠將消息分爲幾種類型:簡單文本(TextMessage)、可序列化的對象 (ObjectMessage)、屬性集合 (MapMessage)、字節流 (BytesMessage)、原始值流 (StreamMessage),還有無有效負載的消息 (Message)。
一般有兩種類型:① 點對點(Point-to-Point)。在點對點的消息系統中,消息分發給一個單獨的使用者。點對點消息每每與隊列(javax.jms.Queue)相關聯。② 發佈/訂閱(Publish/Subscribe)。發佈/訂閱消息系統支持一個事件驅動模型,消息生產者和消費者都參與消息的傳遞。生產者發佈事件,而使用者訂閱感興趣的事件,並使用事件。該類型消息通常與特定的主題(javax.jms.Topic)關聯。
(6)消息的生產者。生產者(Message Producer)對象由Session對象建立,用於發送消息,將消息發送到Destination。一樣,消息生產者分兩種類型:QueueSender和TopicPublisher。能夠調用消息生產者的方法(send或publish方法)發送消息。
(7)消息消費者。 消費者(Message Consumer)對象由Session對象建立,用於接收消息,接收被髮送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別經過session的createReceiver(Queue)或createSubscriber(Topic)來建立。固然,也能夠session的creatDurableSubscriber方法來建立持久化的訂閱者。
(8)消息監聽器。消息監聽器MessageListener,相似於鉤子函數,hook到消息相關的事件中,換句話說,當消息被建立、開始傳輸、轉發、傳輸停止、刪除時,會調用相應的鉤子函數。如註冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。
Spring中集成JMS:
JMS是一個 Java 標準,定義了使用消息代理的通用API 。Spring 經過基於模板的抽象爲 JMS 功能提供了支持,這個模板就是 JmsTemplate 。使用 JmsTemplate可以很是容易地在消息生產方發送隊列和主題消息,在消費消息的一方也可以很是容易地接收這些消息。 (模板方法模式是一種設計模式。通俗的講就是完成一件事情,有固定的數個步驟,可是每一個步驟根據對象的不一樣,而實現細節不一樣。這樣能夠在父類中定義一個完成該事情的總方法,按照完成事件須要的步驟去調用其每一個步驟的實現方法。每一個步驟的具體實現,由子類完成)。後面咱們的代碼分析也以JmsTemplate爲核心進行分析。對於相似Java EE的消息驅動Bean形式的異步接收,Spring提供了大量用於建立消息驅動POJOs的消息監聽器。Spring還提供了一種建立消息監聽器的聲明式方法。
根據《Spring 5 官方文檔》:
(1)org.springframework.jms:定義了各類不一樣的JmsException異常類。在org.springframework.jms.support.JmsUtils的convertJmsAccessException方法中將javax.jms.JMSException異常類轉成成等價的org.springframework.jms.JmsException。
(2)org.springframework.jms.annotation包提供了支持註解驅動監聽端點的必要基礎架構,經過使用@JmsListener實現。
(3)org.springframework.jms.config包提供了 JMS 命名空間的解析實現,以及配置監聽容器和建立監聽端點的 java 配置支持。
(4)org.springframework.jms.connection包提供了適用於獨立應用程序的ConnectionFactory實現。 它還包含 Spring 對 JMS 的PlatformTransactionManager實現(即JmsTransactionManager)。這將容許 JMS 做爲事務性資源無縫集成到 Spring 的事務管理機制中。
(5)org.springframework.jms.core包提供了使用 JMS 的核心功能。它包含了JMS 模板類,用來處理資源的建立與釋放,從而簡化 JMS 的使用,就像JdbcTemplate對 JDBC 作的同樣。
(6)org.springframework.jms.listener:提供了消息監聽器及相關支持類。
(7)org.springframework.jms.remoting:提供基於JMS的RPC方案。
(8)org.springframework.jms.support包:提供了一些支持的功能函數。converter子包提供了 MessageConverter 抽象,進行 Java 對象和 JMS 消息的互相轉換。destination子包提供了管理 JMS 目的地的不一樣策略,好比針對 JNDI 中保存的目標的服務定位器。
2、jms模塊源碼文件清單
1 jms/
1.1 JmsException:繼承自NestedRuntimeException,NestedRuntimeException又繼承自RuntimeException。
Java中全部異常的父類是Throwable類,在Throwable類下有兩大子類:一個是Error類,指系統錯誤異常,例如:VirtualMachineError 虛擬機錯誤,ThreadDeath 線程死鎖。通常若是是Error類的異常的話,就是程序的硬傷,就比如是工廠裏斷水斷電,機器損壞了。另外一個是Exception類,指編碼、環境、用戶操做輸入等異常,這個是比較常見的異常類,Exception類下面又有兩個子類,非檢查異常(又稱運行時異常RuntimeException)和檢查異常。
在RuntimeException異常中有幾個常見的子類,例如:InputMismatchException 輸入不匹配異常;ArithmeticException 算術運算異常;NullPointerException 空指針異常;ArrayIndexOutOfBoundsException 數組下標越界異常;ClassCastException 類型轉換異常。
檢查異常中的子類有:IOException 文件異常;SQLException SQL數據庫錯誤異常。
1.2 IllegalStateException
1.3 InvalidClientIDException
1.4 InvalidDestinationException
1.5 InvalidSelectorException
1.6 JmsSecurityException
1.7 MessageEOFException
1.8 MessageFormatException
1.9 MessageNotReadableException
1.10 MessageNotWriteableException
1.11 ResourceAllocationException
1.12 TransactionInProgressException
1.13 TransactionRolledBackException
1.14 UncategorizedJmsException
1.2-1.14的異常處理類都繼承自JmsException,適用場景如類名所示,其中UncategorizedJmsException表示當其餘JmsException都匹配不到時拋出該異常。org.springframework.jms.support.JmsUtils的convertJmsAccessException方法負責將javax.jms.JMSException異常類轉成成等價的org.springframework.jms.JmsException。
2 jms/annotation
2.1 JmsListener:該類是一個註解接口。java用@interface Annotation{ } 定義一個註解 @Annotation,一個註解是一個類。註解至關於一種標記,在程序中加上了註解就等於爲程序加上了某種標記,之後javac編譯器,開發工具和其餘程序能夠用反射來了解你的類以及各類元素上有無任何標記,看你有什麼標記,就去幹相應的事。
@JmsListener註解用來聲明這是個監聽器方法,也就是標記這個方法被JMS消息監聽器監聽。該類中屬性destination表示監聽的隊列名字,containerFactory表示用來建立JMS監聽器容器。處理@JmsListener註解主要靠JmsListenerAnnotationBeanPostProcessor。註冊JmsListenerAnnotationBeanPostProcessor能夠手動進行,更便捷的是經過Spring的config文件<jms:annotation-driven/>配置,或者使用@EnableJms註解兩種方式將註解的監聽器類自動放到監聽器容器中。
2.2 EnableJms:用@JmsListener這個註解的時候。須要在配置類(@Configuration類)上加上@EnableJms註解,而且要配置一個DefaultJmsListenerContainerFactory監聽容器工廠的Bean實例。
Spring根據註解@EnableJms自動掃描帶有@JmsListener的方法,併爲其建立一個MessageListener把它包裝起來。而JmsListenerContainerFactory的Bean的做用就是爲每一個MessageListener建立MessageConsumer並啓動消息接收循環。
Spring接收消息的步驟:經過JmsListenerContainerFactory配合@EnableJms掃描全部@JmsListener方法,自動建立MessageConsumer、MessageListener以及線程池,啓動消息循環接收處理消息,最終由咱們本身編寫的@JmsListener方法處理消息,可能會由多線程同時併發處理。
2.3 JmsListenerAnnotationBeanPostProcessor:該後置處理器用來實現@JmsListener註解,將帶有@JmsListener方法註冊到指定的JMS消息監聽器容器中。該類中afterSingletonsInstantiated方法的最關鍵的一句 registrar.afterPropertiesSet()便可完成全部監聽的註冊。這個後置處理器能夠經過 <jms:annotation-driven> XML配置或者@EnableJms註解兩種方式自動註冊。
2.4 JmsBootstrapConfiguration:配置類,註冊一個用於處理@JmsListener註解的JmsListenerAnnotationBeanPostProcessor後置處理器。同時也註冊一個默認的JmsListenerEndpointRegistry。當使用@EnableJms註解時,這個配置類會被自動載入。
2.5 JmsListenerConfigurer:Spring管理bean實現的可選接口,這些管理bean用來自定義JMS監聽器端點的配置方式。
2.6 JmsListeners:註解容器,多個@JmsListener註解的組成的集合。
3 jms/config
3.1 JmsListenerContainerFactory:消息監聽容器工廠接口,基於JmsListenerEndpoint。
3.2 AbstractJmsListenerContainerFactory:消息監聽容器工廠的抽象基類。
3.3 DefaultJmsListenerContainerFactory:JmsListenerContainerFactory接口的默認實現,該工廠用來建立DefaultMessageListenerContainer。
3.4 DefaultJcaListenerContainerFactory:JmsListenerContainerFactory接口的一個實現,用來建立一個基於 JCA的MessageListener容器JmsMessageEndpointManager。
JCA (J2EE 鏈接器架構,Java Connector Architecture)是對J2EE標準集的重要補充。由於它注重的是將Java程序鏈接到非Java程序和軟件包中間件的開發。
3.5 SimpleJmsListenerContainerFactory:JmsListenerContainerFactory接口的一個簡單實現,用來建立一個標準的SimpleMessageListenerContainer。
3.6 JmsNamespaceHandler:JMS命名空間處理器。註冊了三種標籤元素對應的處理函數:"listener-container"、"jca-listener-container"、"annotation-driven"。
3.7 AbstractListenerContainerParser:用來解析JMS監聽器容器元素。
3.8 JmsListenerContainerParser:解析JMS的<listener-container>元素。
3.9 JcaListenerContainerParser:解析JMS的<jca-listener-container>元素。
3.10 AnnotationDrivenJmsBeanDefinitionParser:解析jms名字空間中的 'annotation-driven' 元素。
3.11 AbstractJmsListenerEndpoint:Jms監聽端點的基礎模型。
3.12 JmsListenerEndpoint:JMS listener endpoint的模型。藉助JmsListenerConfigurer能夠用來註冊端點。
3.13 MethodJmsListenerEndpoint:JmsListenerEndpoint的一個實現,提供了一些方法用來爲該endpoint處理到來的消息。包括get/set 所屬bean、所屬方法、jms相關參數、spring上下文、消息處理工廠等。
3.14 SimpleJmsListenerEndpoint:JmsListenerEndpoint接口的一個實現,提供了MessageListener,用來爲給endpoint處理到來的消息。
3.15 JmsListenerEndpointRegistrar:將JmsListenerEndpoint對象註冊到JmsListenerEndpointRegistry對象中。
3.16 JmsListenerEndpointRegistry:建立MessageListenerContainer實例,用來保存註冊過的JmsListenerEndpoint,同時對這些消息監聽容器的生命週期進行管理。不一樣於手動建立的MessageListenerContainer,經過註冊生成的監聽容器不屬於ApplicationContext管理的bean,不會被自動裝配。
若是須要管理註冊的消息監聽容器則調用getListenerContainers()函數。若是要使用一個指定的消息監聽容器,使用函數getListenerContainer(String),參數就是endpoint的id值。
3.17 JmsListenerConfigUtils:配置常量值,用於子包間的內部共享。
4 jms/connection
4.1 SingleConnectionFactory:connectionFactory是Spring用於建立到JMS服務器連接的。Spring提供了多種connectionFactory,主要有SingleConnectionFactory和CachingConnectionFactory。
SingleConnectionFactory:對於創建JMS服務器連接的請求會一直返回同一個連接,而且會忽略Connection的close方法調用。
4.2 CachingConnectionFactory:繼承自SingleConnectionFactory,因此它擁有SingleConnectionFactory的全部功能,同時它還提供緩存JMS資源功能,包括緩存Session、MessageProducer和MessageConsumer。
Spring中發送消息的核心是JmsTemplate,然而Jmstemplate的問題是在每次調用時都要打開/關閉session和producter,效率很低,因此引伸出了PooledConnectionFactory鏈接池,用於緩存session和producter。然而這還不是最好的。從spring2.5.3版本後,Spring又提供了CachingConnectionFactory,這纔是首選的方案。默認狀況下, CachingConnectionFactory只緩存一個session。
4.3 SmartConnectionFactory:繼承自ConnectionFactory接口,指示從該connectionFactory獲得的Connection怎樣釋放掉。
4.4 DelegatingConnectionFactory:ConnectionFactory接口的實現類,對全部調用給定的目標ConnectionFactory進行代理。
4.5 ConnectionFactoryUtils:ConnectionFactory類的功能函數,特別是用於從一個指定的ConnectionFactory得到transactional JMS resources。主要在框架內部使用,好比JmsTemplate、DefaultMessageListenerContainer會使用到該類。
4.6 CachedMessageConsumer:MessageConsumer的裝飾器,使一個共享的MessageConsumer實例能適應全部的調用。
4.7 CachedMessageProducer:MessageProducer裝飾器,使得一個共享的MessageProducer實例能適應多數調用。
4.8 ChainedExceptionListener:ExceptionListener接口的實現類,支持異常鏈。在java代碼中經常會再捕獲一個異常後拋出另一個異常,而且但願把異常原始信息保存下來,這被稱爲異常鏈。
4.9 JmsResourceHolder:JmsResourceHolder繼承了ResourceHolderSupport,做爲Jms資源句柄,封裝了JMS的connection、session等資源。
4.10 JmsTransactionManager:JmsTransactionManager用於對JMS ConnectionFactory作事務管理。這將容許JMS應用利用Spring的事務管理特性。JmsTransactionManager在執行本地資源事務管理時將從指定的ConnectionFactory綁定一個Connection/Session這樣的配對到線程中。JmsTemplate會自動檢測這樣的事務資源,並對它們進行相應操做。
4.11 SessionProxy:繼承自Session的接口,被Session代理實現,用來得到該代理的目標Session。
4.12 SynchedLocalTransactionFailedException:同步本地事務未完成時拋出的異常。
4.13 TransactionAwareConnectionFactoryProxy:ConnectionFactory的代理,添加了Spring的事務功能,同事務JNDI ConnectionFactory相似。
4.14 UserCredentialsConnectionFactoryAdapter:ConnectionFactory的一個適配器,授予用戶對於每一個標準的 createConnection()方法調用的權限。
5 jms/core
5.1 JmsMessageOperations:繼承了MessageSendingOperations、MessageReceivingOperations、MessageRequestReplyOperations幾個接口,包含關於JMS消息的操做方法,包括send、receive、convertAndSend、receiveAndConvert等。
5.2 JmsMessagingTemplate:JmsMessageOperations接口的一個實現。
5.3 JmsOperations:詳細列出JMS一系列操做,該接口會被JmsTemplate實現。
5.4 JmsTemplate:核心類。在JDBC中,Spring提供了一個JdbcTemplate來簡化JDBC代碼開發,一樣,Spring也提供了JmsTemplate來簡化JMS消息處理的開發。
JmsTemplate實際上是Spring對JMS更高一層的抽象,它封裝了大部分建立鏈接、獲取session及發送接收消息相關的代碼,使得咱們能夠把精力集中在消息的發送和接收上。
5.5 MessageCreator:利用給定的Session建立一個JMS消息。
5.6 MessagePostProcessor:和JmsTemplate的send方法一塊兒用,將一個對象轉換成message。在一個消息被轉換器處理後能夠進行進一步修改。在設置JMS頭部和屬性的時候有用。
5.7 BrowserCallback:瀏覽JMS queue中的信息的回調函數。在JmsTemplate類中的browse、browseSelected函數中BrowserCallback做爲一個參數傳入。(有些函數要求應用先傳給它一個函數,好在合適的時候調用,以完成目標任務。這個被傳入的、後又被調用的函數就稱爲回調函數callback function)。
5.8 ProducerCallback:send一個消息到JMS destination的回調函數。做爲JmsTemplate類的execute函數一個參數。
5.9 SessionCallback:在一個給定的Session執行一系列操做的回調函數。做爲JmsTemplate的execute函數的參數使用。
jms/core/support
5.10 JmsGatewaySupport:方便應用類訪問JMS。使用該類時須要設置一個ConnectionFactory 或者JmsTemplate實例。若是存在ConnectionFactory ,那麼它經過createJmsTemplate方法會建立本身的JmsTemplate。
6 jms/listener
6.1 AbstractJmsListeningContainer:繼承自JmsDestinationAccessor,做爲全部Message Listener Container的公共基類。它主要提供了JMS connection的生命週期管理的功能,可是沒有對消息接收的方式(主動接收方式或者異步接收方式)等作任何假定。
6.2 MessageListenerContainer:框架內部使用的一個抽象類,用來表示一個消息監聽器容器,不會被用來支持JMS和JCA模式的外部容器實現。
6.3 AbstractMessageListenerContainer: Spring消息監聽器容器(如SimpleMessageListenerContainer、SimpleMessageListenerContainer)的父類。
6.4 AbstractPollingMessageListenerContainer:繼承自AbstractMessageListenerContainer,它提供了對於主動接收消息(polling)的支持,以及支持外部的事務管理。
6.5 SimpleMessageListenerContainer:最簡單的消息監聽器容器,用來從jms 消息隊列中接收消息,而後推送註冊到它內部的消息監聽器(MessageListener)中,只能處理固定數量的JMS會話,且不支持事務。
在Spring框架中使用JMS傳遞消息有兩種方式:JMS template和message listener container,前者用於同步收發消息,後者主要用於異步收消息。
6.6 DefaultMessageListenerContainer:用於異步消息監聽的消息監聽器容器。跟SimpleMessageListenerContainer同樣,DefaultMessageListenerContainer也支持建立多個Session和MessageConsumer來接收消息。跟SimpleMessageListenerContainer不一樣的是,DefaultMessageListenerContainer建立了concurrentConsumers所指定個數的AsyncMessageListenerInvoker(實現了SchedulingAwareRunnable接口),並交給taskExecutor運行。
6.7 LocallyExposedJmsResourceHolder:JMS資源句柄JmsResourceHolder的子類,指示本地的資源。
6.8 SessionAwareMessageListener:SessionAwareMessageListener是Spring爲咱們提供的,它不是標準的JMS MessageListener。MessageListener的設計只是純粹用來接收消息的,假如咱們在使用MessageListener處理接收到的消息時咱們須要發送一個消息通知對方咱們已經收到這個消息了,那麼這個時候咱們就須要在代碼裏面去從新獲取一個Connection或Session。SessionAwareMessageListener的設計就是爲了方便咱們在接收到消息後發送一個回覆的消息,它一樣爲咱們提供了一個處理接收到的消息的onMessage方法。
6.9 SubscriptionNameProvider:消息監聽器會實現該接口,表示一個持久的訂閱,不然消息監聽器被用做一個默認的訂閱。
jms/listener/adapter
6.10 AbstractAdaptableMessageListener:JMS消息監聽器適配器的抽象類,提供系列方法用來提取JMS消息的有效信息。
6.11 JmsResponse:在運行狀態時,destination須要計算時使用該類,返回JMS監聽器的方法用來指示destination。若是在運行中不須要計算destination,推薦使用org.springframework.messaging.handler.annotation.SendTo @SendTo。
6.12 MessageListenerAdapter:MessageListenerAdapter類實現了MessageListener接口和SessionAwareMessageListener接口,它的主要做用是將接收到的消息進行類型轉換,而後經過反射的形式把它委託給目標監聽器進行處理。MessageListenerAdapter會把接收到的消息作以下轉換:
TextMessage轉換爲String對象;
BytesMessage轉換爲byte數組;
MapMessage轉換爲Map對象;
ObjectMessage轉換爲對應的Serializable對象。
6.13 MessagingMessageListenerAdapter:MessageListener適配器,援引一個可配置的InvocableHandlerMethod(用於在某個請求被控制器方法處理時,包裝處理所需的各類參數和執行處理邏輯)。
6.14 ListenerExecutionFailedException:監聽器方法執行失敗時拋出的異常。
6.15 ReplyFailureException:須要回覆的消息發送失敗拋出的異常。
jms/listener/endpoint
6.16 JmsMessageEndpointFactory:JCA 1.7 MessageEndpointFactory工廠類的一個實現,爲JMS監聽器提供了事務管理能力。
6.17 JmsMessageEndpointManager:GenericMessageEndpointManager的一個拓展,ActivationSpec配置中加入了對JMS的支持。
6.18 JmsActivationSpecFactory:基於JmsActivationSpecConfig用來建立JCA 1.5 ActivationSpec對象的工廠。
6.19 StandardJmsActivationSpecFactory:JmsActivationSpecFactory接口的標準實現,支持JMS 1.5中定義的標準JMS屬性,忽視Spring的"maxConcurrency" 、 "prefetchSize" 設置。
6.20 DefaultJmsActivationSpecFactory: JmsActivationSpecFactory接口的默認實現。支持JCA 1.5中所定義的標準額JMS屬性,也包括Spring拓展的一些設置,如"maxConcurrency" 、 "prefetchSize" 。
6.21 JmsActivationSpecConfig:激活JMS message endpoint的一些通用的配置對象。
7 jms/remoting
7.1 JmsInvokerClientInterceptor:方法攔截器,序列化遠程觸發對象和反序列化遠程觸發結果對象,使用Java序列化方法,例如RMI。
7.2 JmsInvokerProxyFactoryBean:jms觸發代理的工廠bean,暴露bean引用的代理服務,使用特定的服務接口。
7.3 JmsInvokerServiceExporter:爲了支持基於消息的RPC,Spring提供了JmsInvokerServiceExporter,它能夠把bean導出爲基於消息的服務;同時爲客戶端提供了JmsInvokerProxyFactoryBean來使用這些服務。
8 jms/support
8.1 JmsAccessor:定義了幾個用於訪問JMS服務的共通屬性,提供了建立Connection和Session的方法。是JmsTemplate、SimpleMessageListenerContainer和DefaultMessageListenerContainer的父類。
8.2 JmsHeaderMapper:將消息頭整合到要向外發送的JMS消息中的接口,或者從接收到的JMS消息提取出消息頭的信息。
8.3 SimpleJmsHeaderMapper :JmsHeaderMapper接口的簡單實現。
8.4 JmsHeaders:將JMS屬性設置到通用消息頭部或者從其提取出JMS屬性用到的預約義的名字或者前綴。
8.5 JmsMessageHeaderAccessor:MessageHeaderAccessor接口的一個實現,可以訪問JMS規範的頭header。
8.6 JmsUtils:JMS工具包,主要是框架內部使用。
8.7 QosSettings:收集Quality-of-Service設置,在發送消息時使用。
jms/support/converter
8.8 MessageConverter:在收發消息時,將Java objects和JMS messages相互轉換。
8.9 SimpleMessageConverter: 實現String與TextMessage之間的相互轉換,字節數組與BytesMessage之間的相互轉換,Map與MapMessage之間的相互轉換以及Serializable對象與ObjectMessage之間的相互轉換。
8.10 MarshallingMessageConverter:使用JAXB庫實現消息與XML格式之間的相互轉換。
8.11 MessagingMessageConverter:利用MessageConverter將messaging abstraction的Message和javax.jms.Message相互轉換。
8.12 SmartMessageConverter:MessageConverter的拓展,增長了轉換提示功能。
8.13 MappingJackson2MessageConverter:使用Jackson 2 JSON庫實現消息與JSON格式之間相互轉換。
8.14 MessageType:定義幾個常量表示要轉換成得目標消息的類型,有text、bytes、map、object。
8.15 MessageConversionException:MessageConverter出錯時拋出的異常。
jms/support/destination
8.16 DestinationResolver:將指定的目的地名解析爲目的地實例。 參數pubSubDomain用於指定是使用「發佈/訂閱」模式(解析後的目的地是Topic),仍是使用「點對點」模式(解析後的目的地是Queue)。
8.17 BeanFactoryDestinationResolver:實現了DestinationResolver接口和BeanFactoryAware接口。它會根據指定的目的地名從BeanFactory中查找目的地實例。
8.18 DynamicDestinationResolver:實現了DestinationResolver接口。根據指定的目的地名動態建立目的地實例。
8.19 CachingDestinationResolver:繼承了DestinationResolver,增長了緩存的功能,在目的地失效的時候,removeFromCache方法會被調用;在JMS provider失效的時候,clearCache方法會被調用。
8.20 JndiDestinationResolver:繼承自JndiLocatorSupport, 同時實現了CachingDestinationResolver接口。若是在JMS provider中配置了靜態目的地,那麼JndiDestinationResolver經過JNDI查找的方式得到目的地實例。
8.21 JmsDestinationAccessor:提供了用於解析目的地的方法。destinationResolver屬性的默認值是DynamicDestinationResolver的實例,也就是說默認採用動態目的地解析的方式;pubSubDomain屬性用於指定是使用「發佈/訂閱」模式仍是使用「點對點」模式,默認值是false(點對點模式)。
8.22 DestinationResolutionException:將指定的目的地名解析爲目的地實例出錯拋出的異常。
3、jms的使用演示
(一)jms的單獨使用
爲了更好的理解Spring整合jms,先來看下單獨使用Java消息服務的方式。以Java消息服務的開源實現產品ActiveMQ爲例。使用消息服務,須要作三件事:一、開啓消息服務器。二、建立消息生產者。三、建立消息消費者。
一、開啓消息服務器
若是是Windows系統下能夠直接雙擊ActiveMQ安裝目錄下的bin目錄下的activemq.bat文件來啓動消息服務器。若是是Linux系統,進入activeMq安裝包下的bin目錄,使用./activemq start命令就能夠啓動activemq服務。
二、建立消息生產者
消息的生產者主要用來將包含業務邏輯的消息發送到消息服務器。如下爲發送消息測試,嘗試發送三條消息到消息服務器,消息內容爲「你們好,這是個測試」。
1 public class Sender{ 2 public static void main(String[] args) throws Exception{ 3 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 4 Connection connection = connectionFactory.createConnection(); 5 connection.start() 6 7 Session session = connection.creatSession(Boolean.True, Session.AUTO_ACKNOWLEDGE); 8 Destination destination = session.createQueue("my-queue"); 9 10 MessageProducer producer = session.createProducer(destionation); 11 for (int i=0; i<3; i++){ 12 TextMessage message = session.createTextMessage("你們好,這是個測試"); 13 Tread.sleep(1000); 14 //經過消息生產者發出消息 15 producer.send(message); 16 } 17 session.commit(); 18 session.close(); 19 connection.close(); 20 } 21 }
三、建立消息消費者
消息的消費者用於鏈接消息服務器將服務器中的消息提取出來進行相應的處理。
1 public class Receiver{ 2 public static void main(String[] args) throws Exception { 3 ConnectionFactory connectionFactory = new ActiveMQConnectionFactroy(); 4 Connection connection = connectionFactory.createConnection(); 5 connection.start(); 6 7 final Session session = connection.createSession(Boolean.TRUE, Session.AUTOACKNOWLEDGE); 8 Destination destination = session.createQueue("my-queue"); 9 MessageConsumer consumer = session.createConsumer(destination); 10 11 int i = 0; 12 while(i<3){ 13 i++; 14 TextMessage message = (TextMessage) consumer.receive(); 15 session.commit(); 16 //TODO something... 17 System.out.println("收到消息:" + message.getText()); 18 } 19 20 session.close(); 21 connection.close(); 22 } 23 }
運行時,先開啓消息的生產者,向服務器發送消息,而後開啓消息的消費者。上述代碼能夠看出,和數據庫實現很類似,一系列的冗餘可是必不可少的代碼用於建立connectionFactory、connection、session,利用session來 createQueue、createProducer、createConsumer,真正用於發送和接收消息的代碼並很少。
Spring 經過基於模板方法的設計模式來解決這個問題,這個模板就是 JmsTemplate 。JmsTemplate封裝了大部分建立鏈接、獲取session及發送接收消息相關的代碼,使得咱們能夠把精力集中在消息的發送和接收上。因此使用 JmsTemplate可以很是容易地在消息生產方發送隊列和主題消息,在消費消息的一方也可以很是容易地接收這些消息。
(二)Spring整合jms
在Spring中使用jms一樣須要作三件事。一、配置文件的配置。二、發送消息。三、接收消息。
一、配置文件的配置
上面咱們提到了Spring將Connection的建立和關閉,Session的建立和關閉等操做都封裝到了JmsTemplate中,因此在Spring的核心配置文件中首先要註冊JmsTemplate類型的bean。ActiveMQConnectionFactory用於鏈接消息服務器,ActiveMQQueue消息隊列是實際的消息源,也要註冊。
1 <beans> 2 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 3 <Property name="brokerURL"> 4 <value>tcp://localhost:61616</value> 5 </Property> 6 </bean> 7 8 <bean id="jmsTemplate" class="org.Springframework.jms.core.JmsTemplate"> 9 <Property name="connectionFactory"> 10 <ref bean="connectionFactory" /> 11 </Property> 12 </bean> 13 14 <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> 15 <constructor-arg index="0"> 16 <vaule>HelloWordQueue</vaule> 17 </constructor-arg> 18 </bean> 19 20 </beans>
二、發送消息
Spring中使用jmsTemplate發送消息到消息服務器中去,省去了冗餘的Connection以及Session等的建立和銷燬過程。
1 public class HelloWorldSender{ 2 public static void main(String[] args) throws Exception{ 3 ApplicationContext context = new ClassPathXmlApplicationContext(new string[] {"test/activeMQ/Spring/applicationContext.xml"}); 4 5 JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate"); 6 Destination destination = (Destination) context.getBean("destination"); 7 8 jmsTemplate.send(destination, new MessageCreator(){ 9 public Message createMessage(Session session) throws JMSException{ 10 return session.createTextMessage("你們好,這是個測試"); 11 } 12 }); 13 } 14 }
三、接收消息。
Spring中鏈接服務器接收消息示例以下:
1 public class HelloWorldReceiver{ 2 public static void main(string[] args) throws Exception{ 3 ApplicationContext context = new ClassPathXMLApplicationContext(new String[] {"test/activeMQ/Spring/applicationContext.xml"}); 4 JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate"); 5 Destination destination = (Destination) context.getBean("destination"); 6 7 TextMessage msg = (TextMessage) jmsTemplate.receive(destination); 8 System.out.println("received msg is:" + msg.getText()); 9 } 10 }
通過上面3步就完成了Spring消息的發送和接收。在HelloWorldSender發送消息類中使用jmsTemplate.send方法來發送消息,沒有問題。在HelloWorldReceiver接收消息類中,使用jmsTemplate.receive方法來接收消息會存在一個問題,該方法只能接收一次消息,若是未收到消息則一直等待。利用消息監聽器來解決這個問題,消息監聽器能夠循環監聽消息服務器上的消息。消息監聽器並不是Spring獨有, Spring整合JMS的應用提供三種類型的消息監聽器,分別是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。
MessageListener是最原始的消息監聽器,它是JMS規範中定義的一個接口。其中定義了一個用於處理接收到的消息的onMessage方法,該方法只接收一個Message參數。
SessionAwareMessageListener是Spring爲咱們提供的,它不是標準的JMS MessageListener。MessageListener的設計只是純粹用來接收消息的,假如咱們在使用MessageListener處理接收到的消息時咱們須要發送一個消息通知對方咱們已經收到這個消息了,那麼這個時候咱們就須要在代碼裏面去從新獲取一個Connection或Session。SessionAwareMessageListener的設計就是爲了方便咱們在接收到消息後發送一個回覆的消息,它一樣爲咱們提供了一個處理接收到的消息的onMessage方法,可是這個方法能夠同時接收兩個參數,一個是表示當前接收到的消息Message,另外一個就是能夠用來發送消息的Session對象。
MessageListenerAdapter類實現了SessionAwareMessageListener接口和MessageListener接口,它的主要做用是將接收到的消息進行類型轉換,而後經過反射的形式把它交給一個普通的Java類進行處理。TextMessage轉換爲String對象;BytesMessage轉換爲byte數組;MapMessage轉換爲Map對象;ObjectMessage轉換爲對應的Serializable對象。
四、利用消息監聽器接收消息(第3步改進版)。
咱們須要作兩步,第一建立一個消息監聽器,第二爲了使用消息監聽器,修改配置文件。
第一, 咱們先來建立一個消息監聽器:
1 public class MyMessageListener implements MessageListener{ 2 3 @Override 4 public void onMessage(Message arg0){ 5 TextMessage msg = (TextMessage) arg0; 6 try{ 7 System.out.println(msg.getText()); 8 } catch (JMSException e){ 9 e.printStackTrace(); 10 } 11 } 12 }
一旦有新消息Spring會將消息引導至消息監聽器以方便用戶進行相應的邏輯處理。
第二,修改配置文件。
爲了使用消息監聽器,須要在配置文件中註冊消息監聽器容器,並將消息監聽器注入到消息監聽器容器中。
1 <beans> 2 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 3 <Property name="brokerURL"> 4 <value>tcp://localhost:61616</value> 5 </Property> 6 </bean> 7 8 <bean id="jmsTemplate" class="org.Springframework.jms.core.JmsTemplate"> 9 <Property name="connectionFactory"> 10 <ref bean="connectionFactory" /> 11 </Property> 12 </bean> 13 14 <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> 15 <constructor-arg index="0"> 16 <vaule>HelloWordQueue</vaule> 17 </constructor-arg> 18 </bean> 19 20 <bean id="myTextListener" class="test.activeMQ.Spring.MyMessageListener" /> 21 22 <bean id="javaConsumer" class="org.Springframework.jms.listener.DefaultMessageListenerContainer"> 23 <property name="ConnectionFactory" ref="connectionFactory" /> 24 <Property name="destination" ref="destination" /> 25 <Property name="messageListener" ref="myTextListener" /> 26 </bean> 27 28 </beans>
經過以上的修改配置就能夠進行消息的監聽功能了,一旦有消息傳入消息服務器,則會被消息監聽器監聽到,並由Spring將消息內容引導至消息監聽器的處理函數中等待用戶的進一步邏輯處理。
4、Spring中jms模塊核心源碼分析
從第三節能夠看出,Spring中使用JmsTemplate模板類來進行發送消息和接收消息操做,接收消息可使用消息監聽器的方法來替代模板方法。因此Spring中jms模塊核心主要有兩個:JmsTemplate和消息監聽器。
(一)JmsTemplate發送消息
咱們先來看JmsTemplate,在上面使用示例中,使用JmsTemplate發送消息的函數爲:
1 jmsTemplate.send(destination, new MessageCreator(){ 2 public Message createMessage(Session session) throws JMSException{ 3 return session.createTextMessage("你們好,這是個測試"); 4 } 5 });
進入JmsTemplate類的send函數:
1 public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException { 2 execute(session -> { 3 doSend(session, destination, messageCreator); 4 return null; 5 }, false); 6 }
(1)通用代碼的抽取
調用了該類中的execute函數,繼續進入execute函數查看源碼邏輯:
1 public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException { 2 Assert.notNull(action, "Callback object must not be null"); 3 Connection conToClose = null; 4 Session sessionToClose = null; 5 try { 6 Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 7 obtainConnectionFactory(), this.transactionalResourceFactory, startConnection); 8 if (sessionToUse == null) { 9 //建立connection 10 conToClose = createConnection(); 11 //根據connection建立session 12 sessionToClose = createSession(conToClose); 13 //是否開啓向服務推送鏈接信息,只有接收信息時須要,發送時不須要 14 if (startConnection) { 15 conToClose.start(); 16 } 17 sessionToUse = sessionToClose; 18 } 19 if (logger.isDebugEnabled()) { 20 logger.debug("Executing callback on JMS Session: " + sessionToUse); 21 } 22 //調用回調函數 23 return action.doInJms(sessionToUse); 24 } 25 catch (JMSException ex) { 26 throw convertJmsAccessException(ex); 27 } 28 finally { 29 //關閉session 30 JmsUtils.closeSession(sessionToClose); 31 //釋放鏈接 32 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection); 33 } 34 }
execute函數封裝建立Connection、建立Session、關閉Session和關閉Connection等操做,這些代碼都是發送消息都要作的工做,沒有差別,execute方法幫助咱們抽離了這些冗餘代碼是咱們更加專一業務邏輯的實現。作完這些通用的操做後,經過調用回調函數將程序引入用戶自定義實現的個性化處理。Spring使用execute方法封裝了冗餘代碼,而將個性化的代碼實現放在了回調函數action.doInJms(sessionToUse)中。
(2)發送消息的實現
咱們繼續看回調函數action.doInJms(sessionToUse)。在發送消息的功能中回調函數經過局部類實現。
1 new SessionCallback<Object>(){ 2 public Object doInJms(Session session) throws JMSException { 3 doSend(session, destination, messageCreator); 4 return null; 5 } 6 }
此時的發送邏輯轉向了doSend方法,咱們只須要關注該方法:
1 protected void doSend(Session session, Destination destination, MessageCreator messageCreator) 2 throws JMSException { 3 4 Assert.notNull(messageCreator, "MessageCreator must not be null"); 5 MessageProducer producer = createProducer(session, destination); 6 try { 7 Message message = messageCreator.createMessage(session); 8 if (logger.isDebugEnabled()) { 9 logger.debug("Sending created message: " + message); 10 } 11 doSend(producer, message); 12 // Check commit - avoid commit call within a JTA transaction. 13 if (session.getTransacted() && isSessionLocallyTransacted(session)) { 14 // Transacted session created by this template -> commit. 15 JmsUtils.commitIfNecessary(session); 16 } 17 } 18 finally { 19 JmsUtils.closeMessageProducer(producer); 20 } 21 }
1 protected void doSend(MessageProducer producer, Message message) throws JMSException { 2 if (this.deliveryDelay >= 0) { 3 producer.setDeliveryDelay(this.deliveryDelay); 4 } 5 if (isExplicitQosEnabled()) { 6 producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive()); 7 } 8 else { 9 producer.send(message); 10 } 11 }
最終的目標仍是經過MessageProducer的send來發送消息。
(二)JmsTemplate接收消息
在上面使用示例中,使用JmsTemplate接收消息的函數爲:TextMessage msg = (TextMessage) jmsTemplate.receive(destination);咱們進入jmsTemplate類的receive函數:
1 public Message receive(Destination destination) throws JmsException { 2 return receiveSelected(destination, null); 3 }
繼續進入JmsTemplate類的receiveSelected函數:
1 public Message receiveSelected(final Destination destination, @Nullable final String messageSelector) throws JmsException { 2 return execute(session -> doReceive(session, destination, messageSelector), true); 3 }
繼續進入JmsTemplate類的doReceive函數:
1 protected Message doReceive(Session session, Destination destination, @Nullable String messageSelector) 2 throws JMSException { 3 4 return doReceive(session, createConsumer(session, destination, messageSelector)); 5 }
1 protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException { 2 try { 3 // Use transaction timeout (if available). 4 long timeout = getReceiveTimeout(); 5 ConnectionFactory connectionFactory = getConnectionFactory(); 6 JmsResourceHolder resourceHolder = null; 7 if (connectionFactory != null) { 8 resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); 9 } 10 if (resourceHolder != null && resourceHolder.hasTimeout()) { 11 timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis()); 12 } 13 Message message = receiveFromConsumer(consumer, timeout); 14 if (session.getTransacted()) { 15 // Commit necessary - but avoid commit call within a JTA transaction. 16 if (isSessionLocallyTransacted(session)) { 17 // Transacted session created by this template -> commit. 18 JmsUtils.commitIfNecessary(session); 19 } 20 } 21 else if (isClientAcknowledge(session)) { 22 // Manually acknowledge message, if any. 23 if (message != null) { 24 message.acknowledge(); 25 } 26 } 27 return message; 28 } 29 finally { 30 JmsUtils.closeMessageConsumer(consumer); 31 } 32 }
其中代碼Message message = receiveFromConsumer(consumer, timeout);中的receiveFromConsumer函數在JmsDestinationAccessor類(package org.springframework.jms.support.destination,JmsTemplate的父類)中定義,咱們進入查看源碼:
1 protected Message receiveFromConsumer(MessageConsumer consumer, long timeout) throws JMSException { 2 if (timeout > 0) { 3 return consumer.receive(timeout); 4 } 5 else if (timeout < 0) { 6 return consumer.receiveNoWait(); 7 } 8 else { 9 return consumer.receive(); 10 } 11 }
實現的方式和發送類似,使用execute函數來封裝冗餘的公共操做,包括建立MessageConsumer,而最終的目標仍是經過MessageConsumer(javax.jms.MessageConsumer包中)的receive來接收消息。
(三)消息監聽器
消息監聽器容器是一個特殊的bean,一旦有消息到達就能夠獲取消息,並經過調用onMessage()方法將消息傳遞給一個消息監聽器MessageListener。Spring提供了兩種消息監聽器容器:
SimpleMessageListenerContainer(package org.springframework.jms.listener):最簡單的消息監聽器容器,只能處理固定數量的JMS會話,且不支持事務。
DefaultMessageListenerContainer(package org.springframework.jms.listener):這個消息監聽器容器創建在SimpleMessageListenerContainer容器之上,添加了對事務的支持。
下面以DefaultMessageListenerContainer爲例進行分析。在上面消息監聽器的使用示例中,須要在配置文件中註冊消息監聽器容器,並將消息監聽器注入到消息監聽器容器中。咱們只有把自定義的消息監聽器注入到消息監聽器容器中,容器纔會把消息轉給消息監聽器進行處理。
DefaultMessageListenerContainer類的繼承關係以下:
DefaultMessageListenerContainer
-- AbstractPollingMessageListenerContainer
-- AbstractMessageListenerContainer
-- AbstractJmsListeningContainer
-- JmsDestinationAccessor
-- JmsAccessor
-- InitializingBean
-- BeanNameAware
-- DisposableBean
-- SmartLifecycle
-- MessageListenerContainer
-- SmartLifecycle
咱們看到DefaultMessageListenerContainer類實現了InitializingBean接口,InitializingBean接口爲bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是繼承該接口的類,在初始化bean的時候都會執行該方法。也就是說spring初始化bean的時候,若是該bean實現了InitializingBean接口,會自動調用afterPropertiesSet方法。DefaultMessageListenerContainer在其父類AbstractJmsListeningContainer中實現了該方法:
1 public void afterPropertiesSet() { 2 //驗證connectionFactory 3 super.afterPropertiesSet(); 4 //驗證配置文件 5 validateConfiguration(); 6 初始化 7 initialize(); 8 }
DefaultMessageListenerContainer監聽器容器的初始化中包含了三句代碼,前兩句用於屬性驗證,好比connectionFactory或者destination等屬性是否爲空等,而真正用於初始化的操做委託在initialize函數中執行:
1 public void initialize() throws JmsException { 2 try { 3 //lifecycleMonitor用於控制生命週期的同步處理 4 synchronized (this.lifecycleMonitor) { 5 this.active = true; 6 this.lifecycleMonitor.notifyAll(); 7 } 8 doInitialize(); 9 } 10 catch (JMSException ex) { 11 synchronized (this.sharedConnectionMonitor) { 12 ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup); 13 this.sharedConnection = null; 14 } 15 throw convertJmsAccessException(ex); 16 } 17 }
函數中調用了該類的抽象方法doInitialize,該函數實際在其子類DefaultMessageListenerContainer中實現(父類調用抽象方法,該抽象方法由子類實現):
1 protected void doInitialize() throws JMSException { 2 synchronized (this.lifecycleMonitor) { 3 for (int i = 0; i < this.concurrentConsumers; i++) { 4 scheduleNewInvoker(); 5 } 6 } 7 }
concurrentConsumers設置的是對每一個listener在初始化的時候設置的併發消費者的個數,由於在spring中messageListener實例是單例的,spring-jms不能自做主張的建立多個messageListener實例來併發消費。因此spring在內部,建立了多個MessageConsumer實例,並使用consumer.receive()方法以阻塞的方式來獲取消息,當獲取消息後,再執行messageListener.onMessage()方法。concurrentConsumers屬性就是爲了指定spring內部能夠建立MessageConsumer的最大個數,當messageConsumer實例被建立後,將會封裝在一個Runner接口並交給taskExecutor來調度;若是consumer在一直沒有收到消息,則會被置爲「idle」並從consumer列表中移除;若是全部的consumer都處於active狀態,則會建立新的consumer實例直到達到maxConcurrentConsumers個數上限。一般taskExecutor的線程池容量稍大於concurrentConsumer。
咱們繼續上述源碼,doInitialize函數中調用了本類DefaultMessageListenerContainer中的scheduleNewInvoker方法:
1 private void scheduleNewInvoker() { 2 AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker(); 3 if (rescheduleTaskIfNecessary(invoker)) { 4 // This should always be true, since we're only calling this when active. 5 this.scheduledInvokers.add(invoker); 6 } 7 }
其中調用了父類AbstractJmsListeningContainer(package org.springframework.jms.listener)的rescheduleTaskIfNecessary方法:
1 protected final boolean rescheduleTaskIfNecessary(Object task) { 2 if (this.running) { 3 try { 4 doRescheduleTask(task); 5 } 6 catch (RuntimeException ex) { 7 logRejectedTask(task, ex); 8 this.pausedTasks.add(task); 9 } 10 return true; 11 } 12 else if (this.active) { 13 this.pausedTasks.add(task); 14 return true; 15 } 16 else { 17 return false; 18 } 19 }
這裏須要注意的是,子類DefaultMessageListenerContainer調用了父類AbstractJmsListeningContainer的rescheduleTaskIfNecessary方法,rescheduleTaskIfNecessary方法又調用回子類DefaultMessageListenerContainer的方法doRescheduleTask,即鉤子方法。因此doRescheduleTask方法是在DefaultMessageListenerContainer中定義的。
1 protected void doRescheduleTask(Object task) { 2 Assert.state(this.taskExecutor != null, "No TaskExecutor available"); 3 this.taskExecutor.execute((Runnable) task); 4 }
doRescheduleTask函數實際上是在開啓一個線程執行Runnable。Spring根據concurrentConsumer數量創建了對應數量的線程,而每個線程都做爲一個獨立的接收者在循環接收消息。
如今回到DefaultMessageListenerContainer的scheduleNewInvoker方法。咱們上面介紹過DefaultMessageListenerContainer建立了concurrentConsumers所指定個數的AsyncMessageListenerInvoker(實現了SchedulingAwareRunnable接口),並交給taskExecutor運行。咱們重點關注AsyncMessageListenerInvoker類(該類是DefaultMessageListenerContainer的一個內部類)。它是做爲一個Runnable去執行,咱們看下其run方法:
1 public void run() { 2 //併發控制 3 synchronized (lifecycleMonitor) { 4 activeInvokerCount++; 5 lifecycleMonitor.notifyAll(); 6 } 7 boolean messageReceived = false; 8 try { 9 //根據每一個任務設置的最大處理消息數量而作不一樣的處理 10 //小於0默認爲無限制,一直能接収消息 11 if (maxMessagesPerTask < 0) { 12 messageReceived = executeOngoingLoop(); 13 } 14 else { 15 int messageCount = 0; 16 //消息數量控制,一旦超出數量則中止循環 17 while (isRunning() && messageCount < maxMessagesPerTask) { 18 messageReceived = (invokeListener() || messageReceived); 19 messageCount++; 20 } 21 } 22 } 23 catch (Throwable ex) { 24 //清理操做,包括關閉session等 25 clearResources(); 26 if (!this.lastMessageSucceeded) { 27 // We failed more than once in a row or on startup - 28 // wait before first recovery attempt. 29 waitBeforeRecoveryAttempt(); 30 } 31 this.lastMessageSucceeded = false; 32 boolean alreadyRecovered = false; 33 synchronized (recoveryMonitor) { 34 if (this.lastRecoveryMarker == currentRecoveryMarker) { 35 handleListenerSetupFailure(ex, false); 36 recoverAfterListenerSetupFailure(); 37 currentRecoveryMarker = new Object(); 38 } 39 else { 40 alreadyRecovered = true; 41 } 42 } 43 if (alreadyRecovered) { 44 handleListenerSetupFailure(ex, true); 45 } 46 } 47 finally { 48 synchronized (lifecycleMonitor) { 49 decreaseActiveInvokerCount(); 50 lifecycleMonitor.notifyAll(); 51 } 52 if (!messageReceived) { 53 this.idleTaskExecutionCount++; 54 } 55 else { 56 this.idleTaskExecutionCount = 0; 57 } 58 synchronized (lifecycleMonitor) { 59 if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { 60 // We're shutting down completely. 61 scheduledInvokers.remove(this); 62 if (logger.isDebugEnabled()) { 63 logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size()); 64 } 65 lifecycleMonitor.notifyAll(); 66 clearResources(); 67 } 68 else if (isRunning()) { 69 int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount(); 70 if (nonPausedConsumers < 1) { 71 logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " + 72 "Check your thread pool configuration! Manual recovery necessary through a start() call."); 73 } 74 else if (nonPausedConsumers < getConcurrentConsumers()) { 75 logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " + 76 "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " + 77 "to be triggered by remaining consumers."); 78 } 79 } 80 } 81 } 82 }
上面函數根據maxMessagesPerTask(每一個任務設置的最大處理消息數量)值的不一樣,分開進行了處理。若是是無限值,執行函數executeOngoingLoop;若是不是,控制接收消息數量,一旦超出數量則中止循環,同時能夠經過設置標誌位running來控制消息接收的暫停與恢復,核心代碼就是invokeListener()。咱們也看一下executeOngoingLoop代碼:
1 private boolean executeOngoingLoop() throws JMSException { 2 boolean messageReceived = false; 3 boolean active = true; 4 while (active) { 5 synchronized (lifecycleMonitor) { 6 boolean interrupted = false; 7 boolean wasWaiting = false; 8 //若是當前任務已經處於激活狀態可是卻給了暫時終止的命令 9 while ((active = isActive()) && !isRunning()) { 10 if (interrupted) { 11 throw new IllegalStateException("Thread was interrupted while waiting for " + 12 "a restart of the listener container, but container is still stopped"); 13 } 14 if (!wasWaiting) { 15 //若是並不是處於等待狀態則說明是第一次執行,須要將激活任務數量減小 16 decreaseActiveInvokerCount(); 17 } 18 //開始進入等待狀態,等待任務的恢復命令 19 wasWaiting = true; 20 try { 21 //經過wait等待,也就是等待notify或者notifyAll 22 lifecycleMonitor.wait(); 23 } 24 catch (InterruptedException ex) { 25 // Re-interrupt current thread, to allow other threads to react. 26 Thread.currentThread().interrupt(); 27 interrupted = true; 28 } 29 } 30 if (wasWaiting) { 31 activeInvokerCount++; 32 } 33 if (scheduledInvokers.size() > maxConcurrentConsumers) { 34 active = false; 35 } 36 } 37 //正常處理流程 38 if (active) { 39 messageReceived = (invokeListener() || messageReceived); 40 } 41 } 42 return messageReceived; 43 }
上面函數中線程等待不是單純採用while循環來控制,由於若是單純採用while循環會浪費CPU的始終週期,給資源形成巨大的浪費。這裏採用的使用全局控制變量lifecycleMonitor的wait()方法來暫停線程。因此,若是終止線程須要再次恢復的話,除了更改this.running標誌位外,還須要調用lifecycleMonitor.notify或者lifecycleMonitor.notifyAll來使線程恢復。
從上述代碼中能夠看出其核心執行流程也是invokeListener()。因此內部類AsyncMessageListenerInvoker的run方法中核心的處理就是調用invokeListener來接收消息並激活消息監聽器。
1 private boolean invokeListener() throws JMSException { 2 this.currentReceiveThread = Thread.currentThread(); 3 try { 4 //初始化資源包括首次建立的時候建立session和consumer 5 initResourcesIfNecessary(); 6 boolean messageReceived = receiveAndExecute(this, this.session, this.consumer); 7 //改變標誌位,信息成功處理 8 this.lastMessageSucceeded = true; 9 return messageReceived; 10 } 11 finally { 12 this.currentReceiveThread = null; 13 } 14 }
上述函數調用了receiveAndExecute函數,該函數在DefaultMessageListenerContainer的父類AbstractPollingMessageListenerContainer(package org.springframework.jms.listener)給出了:
1 protected boolean receiveAndExecute( 2 Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer) 3 throws JMSException { 4 5 if (this.transactionManager != null) { 6 // Execute receive within transaction. 7 TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition); 8 boolean messageReceived; 9 try { 10 messageReceived = doReceiveAndExecute(invoker, session, consumer, status); 11 } 12 catch (JMSException | RuntimeException | Error ex) { 13 rollbackOnException(this.transactionManager, status, ex); 14 throw ex; 15 } 16 this.transactionManager.commit(status); 17 return messageReceived; 18 } 19 20 else { 21 // Execute receive outside of transaction. 22 return doReceiveAndExecute(invoker, session, consumer, null); 23 } 24 }
在介紹消息監聽器容器的分類時,已經介紹DefaultMessageListenerContainer消息監聽器容器創建在SimpleMessageListenerContainer容器之上,添加了對事務的支持。若是用戶配置了this.transactionManage也就是配置了事務,那麼,消息的接收會被控制在事務以內,一旦出現任何異常都會被回滾,而回滾操做也會交由事務管理器同一處理。
上面函數調用了doReceiveAndExecute(),doReceiveAndExecute包含了整個消息的接收處理過程,咱們看下其代碼:
1 protected boolean doReceiveAndExecute(Object invoker, @Nullable Session session, 2 @Nullable MessageConsumer consumer, @Nullable TransactionStatus status) throws JMSException { 3 4 Connection conToClose = null; 5 Session sessionToClose = null; 6 MessageConsumer consumerToClose = null; 7 try { 8 Session sessionToUse = session; 9 boolean transactional = false; 10 if (sessionToUse == null) { 11 sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 12 obtainConnectionFactory(), this.transactionalResourceFactory, true); 13 transactional = (sessionToUse != null); 14 } 15 if (sessionToUse == null) { 16 Connection conToUse; 17 if (sharedConnectionEnabled()) { 18 conToUse = getSharedConnection(); 19 } 20 else { 21 conToUse = createConnection(); 22 conToClose = conToUse; 23 conToUse.start(); 24 } 25 sessionToUse = createSession(conToUse); 26 sessionToClose = sessionToUse; 27 } 28 MessageConsumer consumerToUse = consumer; 29 if (consumerToUse == null) { 30 consumerToUse = createListenerConsumer(sessionToUse); 31 consumerToClose = consumerToUse; 32 } 33 //接收消息 34 Message message = receiveMessage(consumerToUse); 35 if (message != null) { 36 if (logger.isDebugEnabled()) { 37 logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + 38 consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + 39 sessionToUse + "]"); 40 } 41 //模板方法,當消息接收且在未處理前給子類機會作相應的處理, 42 messageReceived(invoker, sessionToUse); 43 boolean exposeResource = (!transactional && isExposeListenerSession() && 44 !TransactionSynchronizationManager.hasResource(obtainConnectionFactory())); 45 if (exposeResource) { 46 TransactionSynchronizationManager.bindResource( 47 obtainConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse)); 48 } 49 try { 50 //激活監聽器 51 doExecuteListener(sessionToUse, message); 52 } 53 catch (Throwable ex) { 54 if (status != null) { 55 if (logger.isDebugEnabled()) { 56 logger.debug("Rolling back transaction because of listener exception thrown: " + ex); 57 } 58 status.setRollbackOnly(); 59 } 60 handleListenerException(ex); 61 // Rethrow JMSException to indicate an infrastructure problem 62 // that may have to trigger recovery... 63 if (ex instanceof JMSException) { 64 throw (JMSException) ex; 65 } 66 } 67 finally { 68 if (exposeResource) { 69 TransactionSynchronizationManager.unbindResource(obtainConnectionFactory()); 70 } 71 } 72 // Indicate that a message has been received. 73 return true; 74 } 75 else { 76 if (logger.isTraceEnabled()) { 77 logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + 78 "session [" + sessionToUse + "] did not receive a message"); 79 } 80 //接收到空消息的處理 81 noMessageReceived(invoker, sessionToUse); 82 // Nevertheless call commit, in order to reset the transaction timeout (if any). 83 if (shouldCommitAfterNoMessageReceived(sessionToUse)) { 84 commitIfNecessary(sessionToUse, null); 85 } 86 // Indicate that no message has been received. 87 return false; 88 } 89 } 90 finally { 91 JmsUtils.closeMessageConsumer(consumerToClose); 92 JmsUtils.closeSession(sessionToClose); 93 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true); 94 } 95 }
上述代碼中咱們重點關注下激活監聽器 doExecuteListener(sessionToUse, message)方法,doExecuteListener方法在AbstractPollingMessageListenerContainer類的父類AbstractMessageListenerContainer(package org.springframework.jms.listener)給出。
1 protected void doExecuteListener(Session session, Message message) throws JMSException { 2 if (!isAcceptMessagesWhileStopping() && !isRunning()) { 3 if (logger.isWarnEnabled()) { 4 logger.warn("Rejecting received message because of the listener container " + 5 "having been stopped in the meantime: " + message); 6 } 7 rollbackIfNecessary(session); 8 throw new MessageRejectedWhileStoppingException(); 9 } 10 11 try { 12 invokeListener(session, message); 13 } 14 catch (JMSException | RuntimeException | Error ex) { 15 rollbackOnExceptionIfNecessary(session, ex); 16 throw ex; 17 } 18 commitIfNecessary(session, message); 19 }
該函數又調用了該類中的invokeListener函數和commitIfNecessary函數:
(1)invokeListener函數
1 protected void invokeListener(Session session, Message message) throws JMSException { 2 Object listener = getMessageListener(); 3 4 if (listener instanceof SessionAwareMessageListener) { 5 doInvokeListener((SessionAwareMessageListener) listener, session, message); 6 } 7 else if (listener instanceof MessageListener) { 8 doInvokeListener((MessageListener) listener, message); 9 } 10 else if (listener != null) { 11 throw new IllegalArgumentException( 12 "Only MessageListener and SessionAwareMessageListener supported: " + listener); 13 } 14 else { 15 throw new IllegalStateException("No message listener specified - see property 'messageListener'"); 16 } 17 }
上述方法又調用了該類中的doInvokeListener方法,繼續查看其代碼:
1 protected void doInvokeListener(SessionAwareMessageListener listener, Session session, Message message) 2 throws JMSException { 3 4 Connection conToClose = null; 5 Session sessionToClose = null; 6 try { 7 Session sessionToUse = session; 8 if (!isExposeListenerSession()) { 9 // We need to expose a separate Session. 10 conToClose = createConnection(); 11 sessionToClose = createSession(conToClose); 12 sessionToUse = sessionToClose; 13 } 14 // Actually invoke the message listener... 15 listener.onMessage(message, sessionToUse); 16 // Clean up specially exposed Session, if any. 17 if (sessionToUse != session) { 18 if (sessionToUse.getTransacted() && isSessionLocallyTransacted(sessionToUse)) { 19 // Transacted session created by this container -> commit. 20 JmsUtils.commitIfNecessary(sessionToUse); 21 } 22 } 23 } 24 finally { 25 JmsUtils.closeSession(sessionToClose); 26 JmsUtils.closeConnection(conToClose); 27 } 28 }
1 protected void doInvokeListener(MessageListener listener, Message message) throws JMSException { 2 listener.onMessage(message); 3 }
經過層層調用,最終提取監聽器並使用listener.onMessage(message);激活了監聽器,也就是激活了用戶自定義的監聽器邏輯。doExecuteListener函數中還有一句重要的代碼commitIfNecessary。
(2)commitIfNecessary函數
AbstractMessageListenerContainer類中的doExecuteListener方法中調用的commitIfNecessary函數。
1 protected void commitIfNecessary(Session session, @Nullable Message message) throws JMSException { 2 // Commit session or acknowledge message. 3 if (session.getTransacted()) { 4 // Commit necessary - but avoid commit call within a JTA transaction. 5 if (isSessionLocallyTransacted(session)) { 6 // Transacted session created by this container -> commit. 7 JmsUtils.commitIfNecessary(session); 8 } 9 } 10 else if (message != null && isClientAcknowledge(session)) { 11 message.acknowledge(); 12 } 13 }
其中又調用了JmsUtils類(package org.springframework.jms.support)的commitIfNecessary(session)函數,咱們進入該函數:
1 public static void commitIfNecessary(Session session) throws JMSException { 2 Assert.notNull(session, "Session must not be null"); 3 try { 4 session.commit(); 5 } 6 catch (javax.jms.TransactionInProgressException | javax.jms.IllegalStateException ex) { 7 // Ignore -> can only happen in case of a JTA transaction. 8 } 9 }
DefaultMessageListenerContainer增長了事務的支持,session.commit()在此完成消息事務的事務提交。告訴消息服務器本地已經正常接收消息,消息服務器接收到本地的事務提交後即可以將此消息刪除。不然,當前消息會被其餘接收者從新接收。
Spring中使用JmsTemplate模板類來進行發送消息和接收消息操做,接收消息可使用消息監聽器的方法來替代模板方法。至此咱們完成了Spring中jms模塊兩個核心JmsTemplate和消息監聽器的分析。
本文參考了 郝佳《Spring源碼深度解析》、《Spring 5 官方文檔》及博客園、CSDN部分文獻。
博衆家之所長,集羣英之薈萃。遴選各IT領域精品雄文!
歡迎關注「IT架構精選」