關於持久化和消息的保留見下表:java
消息類型 | 是否持久化 | 是否有Durable訂閱者 | 消費者延遲啓動時,消息是否保留 | Broker重啓時,消息是否保留 |
Queue | N | - | Y | N |
Queue | Y | - | Y | Y |
Topic | N | N | N | N |
Topic | N | Y | Y | N |
Topic | Y | N | N | N |
Topic | Y | Y | Y | Y |
這得從ActiveMQ的儲存機制提及。在一般的狀況下,非持久化消息是存儲在內存中的,持久化消息是存儲在文件中的,它們的最大限制在配置文件的<systemUsage>節點中配置。可是,在非持久化消息堆積到必定程度,內存告急的時候,ActiveMQ會將內存中的非持久化消息寫入臨時文件中,以騰出內存。雖然都保存到了文件裏,但它和持久化消息的區別是,重啓後持久化消息會從文件中恢復,非持久化的臨時文件會直接刪除。緩存
那若是文件增大到達了配置中的最大限制的時候會發生什麼?我作了如下實驗:安全
設置2G左右的持久化文件限制,大量生產持久化消息直到文件達到最大限制,此時生產者阻塞,但消費者可正常鏈接並消費消息,等消息消費掉一部分,文件刪除又騰出空間以後,生產者又可繼續發送消息,服務自動恢復正常。服務器
設置2G左右的臨時文件限制,大量生產非持久化消息並寫入臨時文件,在達到最大限制時,生產者阻塞,消費者可正常鏈接但不能消費消息,或者本來慢速消費的消費者,消費忽然中止。整個系統可鏈接,可是沒法提供服務,就這樣掛了。網絡
具體緣由不詳,解決方案:儘可能不要用非持久化消息,非要用的話,將臨時文件限制儘量的調大。多線程
這得從java的java.net.SocketException異常提及。簡單點說就是當網絡發送方發送一堆數據,而後調用close關閉鏈接以後。這些發送的數據都在接收者的緩存裏,接收者若是調用read方法仍舊能從緩存中讀取這些數據,儘管對方已經關閉了鏈接。可是當接收者嘗試發送數據時,因爲此時鏈接已關閉,因此會發生異常,這個很好理解。不過須要注意的是,當發生SocketException後,本來緩存區中數據也做廢了,此時接收者再次調用read方法去讀取緩存中的數據,就會報Software caused connection abort: recv failed錯誤。併發
經過抓包得知,ActiveMQ會每隔10秒發送一個心跳包,這個心跳包是服務器發送給客戶端的,用來判斷客戶端死沒死。若是你看過上面第一條,就會知道非持久化消息堆積到必定程度會寫到文件裏,這個寫的過程會阻塞全部動做,並且會持續20到30秒,而且隨着內存的增大而增大。當客戶端發完消息調用connection.close()時,會期待服務器對於關閉鏈接的回答,若是超過15秒沒回答就直接調用socket層的close關閉tcp鏈接了。這時客戶端發出的消息其實還在服務器的緩存裏等待處理,不過因爲服務器心跳包的設置,致使發生了java.net.SocketException異常,把緩存裏的數據做廢了,沒處理的消息所有丟失。異步
解決方案:用持久化消息,或者非持久化消息及時處理不要堆積,或者啓動事務,啓動事務後,commit()方法會負責任的等待服務器的返回,也就不會關閉鏈接致使消息丟失了。socket
默認的狀況下,非持久化的消息是異步發送的,持久化的消息是同步發送的,遇到慢一點的硬盤,發送消息的速度是沒法忍受的。可是在開啓事務的狀況下,消息都是異步發送的,效率會有2個數量級的提高。因此在發送持久化消息時,請務必開啓事務模式。其實發送非持久化消息時也建議開啓事務,由於根本不會影響性能。tcp
有時在發送一些消息以後,開啓2個消費者去處理消息。會發現一個消費者處理了全部的消息,另外一個消費者根本沒收到消息。緣由在於ActiveMQ的prefetch機制。當消費者去獲取消息時,不會一條一條去獲取,而是一次性獲取一批,默認是1000條。這些預獲取的消息,在還沒確認消費以前,在管理控制檯仍是能夠看見這些消息的,可是不會再分配給其餘消費者,此時這些消息的狀態應該算做「已分配未消費」,若是消息最後被消費,則會在服務器端被刪除,若是消費者崩潰,則這些消息會被從新分配給新的消費者。可是若是消費者既不消費確認,又不崩潰,那這些消息就永遠躺在消費者的緩存區裏沒法處理。更一般的狀況是,消費這些消息很是耗時,你開了10個消費者去處理,結果發現只有一臺機器吭哧吭哧處理,另外9臺啥事不幹。
解決方案:將prefetch設爲1,每次處理1條消息,處理完再去取,這樣也慢不了多少。
若是你想在消息處理失敗後,不被服務器刪除,還能被其餘消費者處理或重試,能夠關閉AUTO_ACKNOWLEDGE,將ack交由程序本身處理。那若是使用了AUTO_ACKNOWLEDGE,消息是何時被確認的,還有沒有阻止消息確認的方法?有!
消費消息有2種方法,一種是調用consumer.receive()方法,該方法將阻塞直到得到並返回一條消息。這種狀況下,消息返回給方法調用者以後就自動被確認了。另外一種方法是採用listener回調函數,在有消息到達時,會調用listener接口的onMessage方法。在這種狀況下,在onMessage方法執行完畢後,消息纔會被確認,此時只要在方法中拋出異常,該消息就不會被確認。那麼問題來了,若是一條消息不能被處理,會被退回服務器從新分配,若是隻有一個消費者,該消息又會從新被獲取,從新拋異常。就算有多個消費者,每每在一個服務器上不能處理的消息,在另外的服務器上依然不能被處理。難道就這麼退回--獲取--報錯死循環了嗎?
在重試6次後,ActiveMQ認爲這條消息是「有毒」的,將會把消息丟到死信隊列裏。若是你的消息不見了,去ActiveMQ.DLQ裏找找,說不定就躺在那裏。
ActiveMQ:是Apache出品,最流行的,能力強勁的開源消息總線。是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。JMS(Java消息服務):是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。
首先,咱們得大概瞭解下,在哪些狀況下,ActiveMQ服務器會將消息重發給消費者,這裏爲簡單起見,假定採用的消息發送模式爲隊列(即消息發送者和消息接收者)。
① 若是消息接收者在處理完一條消息的處理過程後沒有對MOM進行應答,則該消息將由MOM重發.
② 若是咱們隊某個隊列設置了預讀參數(consumer.prefetchSize),若是消息接收者在處理第一條消息時(沒向MOM發送消息接收確認)就宕機了,則預讀數量的全部消息都將被重發!
③ 若是Session是事務的,則只要消息接收者有一條消息沒有確認,或發送消息期間MOM或客戶端某一方忽然宕機了,則該事務範圍中的全部消息MOM都將重發。
④ 說到這裏,你們可能會有疑問,ActiveMQ消息服務器怎麼知道消費者客戶端究竟是消息正在處理中還沒來得急對消息進行應答仍是已經處理完成了沒有應答或是宕機了根本沒機會應答呢?其實在全部的客戶端機器上,內存中都運行着一套客戶端的ActiveMQ環境,該環境負責緩存發來的消息,負責維持着和ActiveMQ服務器的消息通信,負責失效轉移(fail-over)等,全部的判斷和處理都是由這套客戶端環境來完成的。
咱們能夠來對ActiveMQ的重發策略(Redelivery Policy)來進行自定義配置,其中的配置參數主要有如下幾個:
可用的屬性
屬性 默認值 說明
l collisionAvoidanceFactor 默認值0.15 , 設置防止衝突範圍的正負百分比,只有啓用useCollisionAvoidance參數時才生效。
l maximumRedeliveries 默認值6 , 最大重傳次數,達到最大重連次數後拋出異常。爲-1時不限制次數,爲0時表示不進行重傳。
l maximumRedeliveryDelay 默認值-1, 最大傳送延遲,只在useExponentialBackOff爲true時有效(V5.5),假設首次重連間隔爲10ms,倍數爲2,那麼第二次重連時間間隔爲 20ms,第三次重連時間間隔爲40ms,當重連時間間隔大的最大重連時間間隔時,之後每次重連時間間隔都爲最大重連時間間隔。
l initialRedeliveryDelay 默認值1000L, 初始重發延遲時間
l redeliveryDelay 默認值1000L, 重發延遲時間,當initialRedeliveryDelay=0時生效(v5.4)
l useCollisionAvoidance 默認值false, 啓用防止衝突功能,由於消息接收時是可使用多線程併發處理的,應該是爲了重發的安全性,避開全部併發線程都在同一個時間點進行消息接收處理。全部線程在同一個時間點處理時會發生什麼問題呢?應該沒有問題,只是爲了平衡broker處理性能,不會有時很忙,有時很空閒。
l useExponentialBackOff 默認值false, 啓用指數倍數遞增的方式增長延遲時間。
l backOffMultiplier 默認值5, 重連時間間隔遞增倍數,只有值大於1和啓用useExponentialBackOff參數時才生效。
發送NON_PERSISTENT Message時,消息發送方默認使用異步方式:便是說消息發送後發送方不會等待NON_PERSISTENT Message在服務端的任何回執。那麼問題來了:若是這時服務端已經出現了消息堆積,而且堆積程度已經達到「沒法再接收新消息」的極限狀況了,那麼消息發送方若是知曉並採起相應的策略呢?
實際上所謂的異步發送也並不是絕對的異步,消息發送者會在發送必定大小的消息後等待服務端進行回執(這個配置只是針對使用異步方式進行發送消息的狀況):
// 如下語句設置消息發送者在累計發送102400byte大小的消息後(多是一條消息也多是多條消息) // 等待服務端進行回執,以便肯定以前發送的消息是否被正確處理 // 肯定服務器端是否產生了過量的消息堆積,須要減慢消息生產端的生產速度 connectionFactory.setProducerWindowSize(102400);
若是不特地指定消息的發送類型,那麼消息生產者默認發送PERSISTENT Meaage。這樣的消息發送到ActiveMQ服務端後將被進行持久化存儲,而且消息發送者默認等待ActiveMQ服務端對這條消息處理狀況的回執。
以上這個過程很是耗時,ActiveMQ服務端不但要接受消息,在內存中完成存儲,而且按照ActiveMQ服務端設置的持久化存儲方案對消息進行存儲(主要的處理時間耗費在這裏)。爲了提升ActiveMQ在接受PERSISTENT Meaage時的性能,ActiveMQ容許開發人員聽從JMS API中的設置方式,爲消息發送端在發送PERSISTENT Meaage時提供異步方式:
// 使用異步傳輸 // 上文已經說過,若是發送的是NON_PERSISTENT Message // 那麼默認就是異步方式 connectionFactory.setUseAsyncSend(true); ......
一旦您進行了這樣的設置,就須要設置回執窗口:
// 一樣設置消息發送者在累計發送102400byte大小的消息後 // 等待服務端進行回執,以便肯定以前發送的消息是否被正確處理 // 肯定服務器端是否產生了過量的消息堆積,須要減慢消息生產端的生產速度 connectionFactory.setProducerWindowSize(102400); ......