spring integration fixed-delay的實現

以下配置: spring

<int:service-activator input-channel="channel"  ref="serviceActivator" method="execute"> 線程

    <int:poller receive-timeout="1000" fixed-delay="600" task-executor="taskExecutor" 繼承

        max-messages-per-poll="2" /> 隊列

</int:service-activator> 內存

taskExecutor是系統定義的一個線程池; input

還有一個線程池,線程的名字已「task-scheduler-」開頭,spring integration在初始化的時候建立的,線程數爲10個,每隔600毫秒,往 taskExecutor 提交一個任務;該任務代碼從 input-channel 中讀取數據,執行後續操做; io

taskScheduler(線程池的名字)的初始化,在這個類 DefaultConfiguringBeanFactoryPostProcessor 中,此類實現了 BeanFactoryPostProcessor,擴展了spring的基本的擴展點。 擴展

taskScheduler的實現類爲 ThreadPoolTaskScheduler。 配置

每個poller都是一個PollingConsumer,PollingConsumer繼承自AbstractPollingEndpoint,實現了聲明週期方法;最終會調用 ThreadPoolTaskScheduler 的 schedule 方法,傳遞的參數第一個就是poller任務,第二個是 Trigger,表示調度的策略(對應fixed-delay);poller任務代碼就是往 taskExecutor 中提交任務(參考 AbstractPollingEndpoint$Poller)。 任務的啓動代碼入口在 AbstractPollingEndpoint doStart 方法,而後 調度的處理參考 new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();的相關處理邏輯。 線程池

receive-timeout ,表示 taskExecutor 從 input-channel 獲取數據時,若是隊列爲空要等待 1 秒種。

綜合一下,也就是說,taskExecutor 每秒鐘從隊裏中移除一個元素,而taskScheduler會每600毫秒往隊列中放置一個元素,同時配置中不僅存在一個 poller;因此即便沒有請求到來,taskExecutor的隊列長度也會慢慢增加,直至內存溢出。解決方案:合適配置taskExecutor的對列長度,線程數;配置receive-timeout爲一個較小的值,好比50毫秒;

相關文章
相關標籤/搜索