豔傑。擅長 Python 與 JAVA , 現任餓了麼物流團隊資深 Python 工程師,負責分流核心鏈路, 專一於系統業務分析及穩定性建設。html
上次咱們分享了咱們團隊Java應用Docker化部署GC變長的踩坑經歷,發現還真的幫助不少同窗解決了他們項目中一樣的問題。這對咱們來講真的是很大的一個激勵,因此咱們決定後面會不按期分享一些咱們團隊的踩坑經歷,今天分享的是 Spring-RabbitMQ consumer 的兩個坑,但願能對你們有所幫助。java
spring-rabbit 版本變動至 1.6.2.RELEASEspring
consumer 數量正常,mq 控制面板的 prefetch
參數始終是1, 消息沒法正常 ack, 隊列處於假死狀態, 系統報異常 org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException
bash
[2018-09-09 10:31:27.27]RuntimeException-org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException [ ERROR] [ Elog ]
Execution of Rabbit message listener failed. org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpIllegalStateException: No default listener method specified: Either specify a non-null value for the 'defaultListenerMethod' property or override the 'getListenerMethodName' method.
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:291)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777)
... 10 more
複製代碼
ListenerExecutionFailedException服務器
根據官方文檔說明,是 consumer 在消息消費時發生了異常, 默認狀況下,該消息會被 reject, 並從新回到隊列中, 但若是異常發生在到達用戶代碼以前的異常,此消息將會一直投遞。 異步
org.springframework.amqp.AmqpIllegalStateExceptionide
該異常拋出,RabbitMQ沒法收到消息迴應,將一直處於等待狀態。post
No default listener method specified: Either specify a non-null value ...fetch
找不到 consumer OnMessage
的方法, 猜想是類加載錯誤,因而從新開始檢查 consumer 類定義,最終發現 rabbit:listener
的 ID 和 真實 consumer 的 ID 衝突,致使真實的 consumer Bean 無效,找不到接受消息的方法,而 rabbitmq 在與 client 端通訊過程當中發生異常,會中止消費。ui
<bean id="consumer_1" class="me.ele.Consumer1"/>
<rabbit:listener-container connection-factory="galaxyConnectionFactory" acknowledge="manual" concurrency="16" >
<rabbit:listener queues="queue_1" ref="consumer_1" id="consumer_1" />
</rabbit:listener-container>
複製代碼
刪除 rabbit:listener
的 ID 屬性
原服務有 6 個隊列,每一個隊列起 10 個 consumer, task-executor
線程池設置爲 90, 後因數據量增長,發現消費能力不足,決定增長 consumer 數量,調整 listener-container 的 concurrency 爲 20,重啓服務器。
部分隊列 consumer 數量不足,缺失項始終爲 xml 中聲明在後的隊列
回退 concurrency 參數爲 10 異常消失,觀察異常現場,發現 consumer 消失隊列有前後順序之分,且 consumer 數量存在上限值爲 90,猜想是收參數限制,檢查配置參數以下:
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
id="taskExecutor">
<!--核心線程數 -->
<property name="corePoolSize" value="16"/>
<!--最大線程數 -->
<property name="maxPoolSize" value="16"/>
<property name="queueCapacity" value="500"/>
<!--線程池維護線程所容許的空閒時間 -->
<property name="keepAliveSeconds" value="60"/>
<!--線程池對拒絕任務(無線程可用)的處理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
</property>
<property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>
<rabbit:listener-container connection-factory="monitorConnectionFactory"
acknowledge="manual" task-executor="taskExecutor" prefetch="10" concurrency="20">
<rabbit:listener queues="queue_1" ref="consumer_1"/>
<rabbit:listener queues="queue_2" ref="consumer_2"/>
<rabbit:listener queues="queue_3" ref="consumer_3"/>
<rabbit:listener queues="queue_4" ref="consumer_4"/>
<rabbit:listener queues="queue_5" ref="consumer_5"/>
<rabbit:listener queues="queue_6" ref="consumer_6"/>
</rabbit:listener-container>
複製代碼
經排查對比,發現上限值與 task-executor ThreadPoolTaskExecutor 參數 corePoolSize
、maxPoolSize
極爲接近,查詢相關資料發現,多個 queue 的 consumer 會共用 taskExecutor 的線程池數量,若是線程池數量不足,consumer 沒法建立。 後發現官方文檔已有明確說明.
增大 task-executor corePoolSize
和 maxPoolSize
的值爲 200,重啓服務,解決。
rabbit:listener
), 即便在不一樣文件也須要注意。SimpleAsyncTaskExecutor
做爲異步線程池,每次請求新開線程,沒有最大線程數設置. 其不是真的線程池,這個類不重用線程,每次調用都會建立一個新的線程。可改用 ThreadPoolTaskExecutor
替代,但存在多個 queue 時須要 ThreadPoolTaskExecutor
線程池足夠可用,尤爲針對多個 listener-container 共用一個 task-executor 的狀況。閱讀博客還不過癮?
歡迎你們掃二維碼經過添加羣助手,加入交流羣,討論和博客有關的技術問題,還能夠和博主有更多互動
博客轉載、線下活動及合做等問題請郵件至 shadowfly_zyl@hotmail.com 進行溝通