Spring AMQP 源碼分析MessageListener

### 準備

## 目標

瞭解 Spring AMQP 如何實現異步消息投遞(推模式)html

## 前置知識

《RabbitMQ入門_05_多線程消費同一隊列》java

## 相關資源

Quick Tour for the impatient:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#async-consumer>git

Sample code:<https://github.com/gordonklg/study>,rabbitmq modulegithub

源碼版本:Spring AMQP 1.7.3.RELEASEspring

## 測試代碼

gordon.study.rabbitmq.springamqp.AsyncConsumer.java緩存

 

### 分析

## MessageListener

org.springframework.amqp.core.MessageListener 是 Spring AMQP 異步消息投遞的監聽器接口,它只有一個方法 onMessage,用於處理消息隊列推送來的消息。多線程

 

MessageListener 大概對應 amqp client 中的 Consumer 類。onMessage 方法大概對應 Consumer 類的 handleDelivery 方法。併發

從這也能夠看出,Spring AMQP 的 Message 類至少包含 consumer tag、envelope、basic properties 和 message body 等信息框架

 

## MessageListenerContainer

org.springframework.amqp.rabbit.listener.MessageListenerContainer 能夠看做 message linstener 的容器。但這個 Container 的語義並非指它包含多個 message listener,實際上從方法註釋和實現代碼能夠看出,MessageListenerContainer 只包含一個 MessageListener 。那 Container 的語義是什麼呢?異步

 

一方面,Container 是指雖然只有一個 MessageListener 指定消息消費的邏輯,可是能夠生成多個線程使用相同的 MessageListener 同時消費消息。代碼第19行 setConcurrentConsumers方法就是用來指定併發消費者的數量。能夠把 MessageListenerContainer 當作下圖中的 Subscriber group

 

 

另外一方面,Container 表明生命週期管理的職責。MessageListener 僅僅實現消息消費邏輯,而整個消息消費什麼時候開始、什麼時候結束、如何設置、狀態怎樣等等問題全都是由 MessageListenerContainer(及其實現類)負責的。實際上,MessageListenerContainer 繼承自 SmartLifecycle 接口,該接口是 Spring 容器提供的與生命週期管理相關的接口,實現該接口的類通常狀況下會由 Spring 容器負責啓動與中止。因爲本例沒有啓用 Spring 容器環境,因此代碼第26行須要主動調用 start 方法,消息消費纔會開始執行。

 

## 內部實現思路

咱們知道,amqp client 中的 Consumer 接口實際上只定義了回調方法,咱們在回調方法(主要是 handleDelivery 方法)中實現本身的業務邏輯(對消息的消費)。Consumer 接口的回調方法其實是在一個獨立線程中執行的,當咱們調用 Channel 的 basicConsume 方法時,amqp client 會建立線程處理消息、建立隊列緩存從 broker 推送來的消息。然而這些內部實現並無暴露出來,致使 Spring AMQP 必須本身從新編寫一套相似的實現以得到最大的靈活度。

 

按照前面的分析,咱們能夠想象 Spring AMQP 爲了實現本身的 message listener,須要哪些組件:

  • MessageListenerContainer 的實現類,即 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer。它做爲整個異步消息投遞的核心類存在。
  • 由於 MessageListenerContainer 實際上管理了一個消費者線程組,所以須要相關線程類與線程調度類。Spring AMQP 中該線程類爲 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer,調度類固然就是 SimpleMessageListenerContainer,其start 方法會啓動線程
  • 消息隊列推送過來的消息須要一個本地隊列緩存。
  • 須要實現 amqp client 的 Consumer 接口。在該接口實現類中,咱們簡單的把消息放到本地隊列中。org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer 負責這件事
  • 根據單一職責原則,線程類只負責異步消費者的建立與(無限循環)消息消費;InternalConsumer 只負責實現 amqp client 的 Consumer 接口,與 amqp client 原生的異步消息投遞實現對接,將消息放入本地隊列。那麼,咱們還須要一個真正的異步消費者模型,用來管理消費行爲與狀態。org.springframework.amqp.rabbit.listener.BlockingQueueConsumer 承擔這部分責任。從名字能夠看出,BlockingQueueConsumer 採用 BlockingQueue 做爲本地隊列緩存消息。
  • 用戶的業務邏輯是在 MessageListener 接口中實現的,框架的主要處理過程爲:建立合適的鏈接與信道,從 amqp client 中獲取消息暫存到本地緩存,從本地緩存讀取消息並調用 MessageListener 接口的 onMessage 方法消費消息。

 

## 內部流程分析

SimpleMessageListenerContainer 的 start 方法會根據 int concurrentConsumers 的值建立對應數量的 BlockingQueueConsumer 實例,並放入 Set<BlockingQueueConsumer> consumers 中。接着爲每個 BlockingQueueConsumer 建立對應的消息處理線程 AsyncMessageProcessingConsumer(實現了 Runnable 接口),並經過 Executor taskExecutor = new SimpleAsyncTaskExecutor() 這個自實現的線程池啓動每個 AsyncMessageProcessingConsumer 線程。最後經過判斷每個 AsyncMessageProcessingConsumer 的 FatalListenerStartupException startupException 屬性是否有值來判斷 SimpleMessageListenerContainer 是否正常啓動了全部的消息監聽器。

 

構建 BlockingQueueConsumer 的構造函數參數不少,其中 ConnectionFactory 是代碼第17行建立的 CachingConnectionFactory,AcknowledgeMode 默認爲 AUTO。

org.springframework.amqp.core.AcknowledgeMode 定義了三種確認模式:

  • NONE:不確認,至關於 amqp client 中 Channel.basicConsume 方法中 autoAck 參數值設爲 true
  • MANUAL:用戶經過 channel aware listener 手動控制消息確認
  • AUTO:Spring AMQP 框架根據 message listener 的 onMessage 執行過程當中是否拋出異常來決定是否確認消息消費

 

AsyncMessageProcessingConsumer 的 run 方法比較複雜,粗略解讀一下

  1. 調用 BlockingQueueConsumer 的 start 方法(不是 Runnable 接口)。
  2. start 方法先經過 ConnectionFactoryUtils.getTransactionalResourceHolder 靜態方法建立出供該線程使用的 channel,該方法返回類型是 RabbitResourceHolder。這部分代碼涉及到事務,很複雜,可是本文的測試代碼不涉及事務,目前只要瞭解多個 AsyncMessageProcessingConsumer 會生成多個 RabbitResourceHolder 實例,可是因爲使用了 CachingConnectionFactory 的默認緩存模式,因此這些 RabbitResourceHolder 實例共用同一個(AMQP)鏈接,每一個 AsyncMessageProcessingConsumer 獨享該鏈接建立的一個(AMQP)信道便可
  3. start 方法接着建立 InternalConsumer 實例,並調用剛建立的 AMQP 信道的 basicQos 和  basicConsume 方法開始接受消息。這樣,當隊列接受到消息時,amqp client 會主動調用 InternalConsumer 的 handleDelivery 方法。該方法調用 BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body)); 將消息放到 BlockingQueueConsumer 的 BlockingQueue<Delivery> queue 中。org.springframework.amqp.rabbit.support.Delivery 類封裝了 amqp client 經過  handleDelivery 方法回送過來的全部參數。
    有兩個細節值得說一下:第一,BlockingQueueConsumer 能夠同時消費多個隊列,對每一個隊列,都會調用 basicConsume 方法讓 InternalConsumer 監聽當前隊列(即同一個信道,同一個 Consumer ,不一樣的隊列);其二,能夠經過 ConsumerTagStrategy tagStrategy 設定 Tag 命名規則。
  4. 接着,在 while 循環中調用 SimpleMessageListenerContainer 的 receiveAndExecute 方法,不停的嘗試從 queue 中獲取 Delivery 實例,將之轉化爲 Message,而後執行 MessageListener 的 onMessage 回調方法。
  5. 若是執行成功,則調用 AMQP 信道的 basicAck 方法確認消息消費成功。
  6. 若是執行過程當中發生異常,則將異常轉化爲 ListenerExecutionFailedException 拋出。默認狀況下,Spring AMQP 處理用戶自定義異常的邏輯很是簡單:調用 AMQP 信道的 basicReject 方法將消息退回隊列,打印 warning 級別的日誌,但不會打破 AsyncMessageProcessingConsumer 線程的 while 循環,消息消費繼續進行。這部份內容下篇文章分析。

轉自https://www.cnblogs.com/gordonkong/p/7115155.html

相關文章
相關標籤/搜索