DefaultMessageListenerContainer是一個用於異步消息監聽的管理類。html
DefaultMessageListenerContainer最簡單的實現邏輯,一個任務執行器,執行任務(即消息監聽)。java
DefaultMessageListenerContainer實現的主要原理是,經過內部初始化創建的一個taskExecutor(默認是SimpleAsyncTaskExecutor)用於執行消息監聽的任務(AsyncMessageListenerInvoker)。spring
這裏默認的任務執行器是SimpleAsyncTaskExecutor,這個執行器的缺點是不會重用鏈接,也就是對於每一個任務都須要新開啓一個線程,執行完任務後會關閉它。若是要優化的話能夠考慮線程池。api
消息監聽的任務被抽象成AsyncMessageListenerInvoker類,這個類實現了Runnable接口,內部run方法實際上是經過不斷循環consumer.recieve()方法來實現監聽。緩存
事實上一個消費者對應了一個AsyncMessageListenerInvoker任務,每一個任務須要一個單獨的線程去執行它。這個AsyncMessageListenerInvoker實例被放在了一個名爲scheduledInvokers的set裏面。session
其實咱們還有一個比較關心的地方是這個DefaultMessageListenerContainer緩不緩存connection,session,consumer。它是根據catchLevel屬性來決定是否緩存connection,session,consumer。默認的catchLevel對應常量CATCH_AUTO,即由配置的外部事務管理器決定。catchLevel級別分別是CATCH_NONE,CATCH_CONNECTION,CATCH_SESSION,CATCH_CONSUMER,分別對應0,1,2,3。我試了下默認的CATCH_AUTO在沒有定義事務管理時值爲 CATCH_CONSUMER,即3。異步
DefaultMessageListenerContainer會根據catchLevel來緩存共享connection,session,及consumer。值爲3的話就會緩存connection,session,及consumer,在初始化的時候就會調用父類AbstractJmsListeningContainer的doStart()方法,判斷cacheLevel是否大於等於1,若是大於就建立一個connection將放入成員變量sharedConnection中。優化
每一個任務被執行的時候(即責任是監聽消息),會先去獲取connection,session及consumer(經過調用initResourcesIfNecessary方法)就像咱們本身最初實現一個簡單的客戶端消費者同樣。只不過這裏會根據catchLevel來決定是否緩存session及consumer。被緩存了的session及consumer放在對應的成員變量裏面。線程
接着任務會想要執行consumer.recieve方法,這以前確定要獲取onnection,session及consumer,若是已有onnection,session及consumer則獲取過來,若是沒有則經過配置的信息新建。執行完consumer.recieve後,會判斷consumer.recieve返回的消息是否爲空。3d
不爲空則調用message對應的messageListner(以前咱們在DefaultMessageListenerContainer中經過方法setMessageListner設置的)的onMessage執行相應的邏輯,並設置這個任務的Idle爲false,代表這個任務不是空閒的,而後會調用方法判斷是否應該新建任務實例,這個受限於MaxConcurrentConsumers及
IdleTaskExecutionLimit
。爲空則不須要特別處理,只需調用noMessageReceived方法將idle標記設爲true。
任務執行完後,會在finally處釋放connection,session及consumer。這個是根據上述講的catchLevel來設置的。
繼承體系以下:
AbstractJmsListeningContainer提供了一個最上層最基礎的jms消息監聽管理類所應該有的方法。提供了start(啓動這個管理類),stop,initialize(初始化這個管理類),establishSharedConnection等。
====================
http://blog.sina.com.cn/s/blog_6592ed330100loqk.html
DefaultMessageListenerContainer繼承自AbstractPollingMessageListenerContainer,主要使用同步的方式接收消息(也就是經過循環調用MessageConsumer.receive的方式接收消息)。該類主要的屬性以下:
跟SimpleMessageListenerContainer同樣,DefaultMessageListenerContainer也支持建立多個Session和MessageConsumer來接收消息。跟SimpleMessageListenerContainer不一樣的是,DefaultMessageListenerContainer建立了concurrentConsumers所指定個數的AsyncMessageListenerInvoker(實現了SchedulingAwareRunnable接口),並交給taskExecutor運行。
maxMessagesPerTask屬性的默認值是Integer.MIN_VALUE,可是若是設置的taskExecutor(默認值是SimpleAsyncTaskExecutor)實現了SchedulingTaskExecutor接口而且其prefersShortLivedTasks方法返回true(也就是說該TaskExecutor傾向於短時間任務),那麼maxMessagesPerTask屬性會自動被設置爲10。
若是maxMessagesPerTask屬性的值小於0,那麼AsyncMessageListenerInvoker.run方法會在循環中反覆嘗試接收消息,並在接收到消息後調用MessageListener(或者SessionAwareMessageListener);若是maxMessagesPerTask屬性的值不小於0,那麼AsyncMessageListenerInvoker.run方法裏最多會嘗試接收消息maxMessagesPerTask次,每次接收消息的超時時間由其父類AbstractPollingMessageListenerContainer的receiveTimeout屬性指定。若是在這些嘗試中都沒有接收到消息,那麼AsyncMessageListenerInvoker的idleTaskExecutionCount屬性會被累加。在run方法執行完畢前會對idleTaskExecutionCount進行檢查,若是該值超過了DefaultMessageListenerContainer.idleTaskExecutionLimit(默認值1),那麼這個AsyncMessageListenerInvoker可能會被銷燬。
全部AsyncMessageListenerInvoker實例都保存在scheduledInvokers中,實例的個數能夠在concurrentConsumers和maxConcurrentConsumers之間浮動。跟SimpleMessageListenerContainer同樣,應該只是在Destination爲Queue的時候才使用多個AsyncMessageListenerInvoker實例。
cacheLevel屬性用於指定是否對JMS資源進行緩存,可選的值是CACHE_NONE = 0、CACHE_CONNECTION = 一、CACHE_SESSION = 二、CACHE_CONSUMER = 3和CACHE_AUTO = 4。默認狀況下,若是transactionManager屬性不爲null,那麼cacheLevel被自動設置爲CACHE_NONE(不進行緩存),不然cacheLevel被自動設置爲CACHE_CONSUMER。
若是cacheLevel屬性值大於等於CACHE_CONNECTION,那麼sharedConnectionEnabled方法(在AbstractJmsListeningContainer中定義)返回true,也就是說使用共享的JMS鏈接。