首先簡單介紹一下ActiveMQ。ActiveMQ是由Apache軟件基金會提供的開源免費消息服務器,目前版本是5.8.0。web
ActiveMQ具備如下特色:數據庫
基於JMS 1.1和J2EE 1.4規範;apache
支持多種鏈接協議:HTTP/S,IP組播,SSL,STOMP,TCP,UDP,XMPP等;緩存
支持多種消息持久化機制:文件形式持久化(KahaDB),關係數據庫形式持久化(JDBC);安全
插件化的安全機制:ActiveMQ支持插件開發,而且它的安全機制就是以插件形式實現靈活配置的;服務器
支持嵌入Java應用,ActiveMQ就能夠做爲一個單獨服務,也能夠直接嵌入到其餘Java應用中;網絡
支持與應用服務器進行集成:支持Apache Tomcat,Jetty,ApacheGeronimo,Jboss;數據結構
多種語言的客戶端API:支持C/C++,.NET,Perl,PHP,Python,Ruby等;app
集羣;webapp
動態化的簡單管理。
ActiveMQ使用Java開發,使用Spring配置bean,使用Maven構建。
ActiveMQ 5.6.0的源代碼目錄結構如圖所示,從pom文件中能夠看出,共包含23個模塊,好比核心模塊activemq-core,web控制檯模塊activemq-web-console,文件形式的消息持久化模塊kahadb等。
使用Maven編譯、打包後,在assembly/target下ActiveMQ的生成可執行包。
ActiveMQ的執行包目錄結構如圖所示:
bin目錄下是啓動腳本;
conf目錄下是配置文件;
data目錄下是消息持久化的文件;
webapps是web控制檯目錄。
經過bin目錄下的腳本能夠啓動ActiveMQ服務,從配置文件能夠看出,ActiveMQ服務也是經過Spring來進行配置的,整個配置文件實質就是對一個broker實例進行相關配置,從命名空間能夠看出,ActiveMQ broker實例其實就是一個org.apache.activemq.xbean. XBeanBrokerService類的實例,而XBeanBrokerService繼承自org.apache.activemq.broker.BrokerService,BrokerService是ActiveMQ中的一個核心類,用於對外提供消息服務。
ActiveMQbroker的配置包含如下部分:
目的地策略:用來對目的地(隊列、主題)的相關策略進行配置,包括流量控制、分發策略等;
JMX管理:用來配置JMX;
持久化設置:
存儲設置:
鏈接器設置:
Web控制檯設置:ActiveMQ經過內嵌Jetty,用來提供Web控制檯,可以對鏈接、主題、隊列、消費者等進行可視化的管理。
在ActiveMQ中,隊列或主題的地址名稱可使用「.」號分割的方式來表示,例如queue.news.subject0-10。
而且ActiveMQ支持使用通配符表示某一類隊列和主題的地址,通配符包括:
.,用於分割地址中的名稱;
*,用於匹配「.」之間的任意字符;
>,用於匹配任意字符。
在目的地策略中,包含多個策略實體,策略實體既能夠描述某一個隊列或主題的策略,也能夠經過通配符描述某一類隊列或主題的策略。
目的地策略配置示例如上所示,該策略實體對全部主題進行配置,對每一個主題進行流量控制,內存限制爲1M,將消息引用保存在內存中。
如下列出了策略實體的部分屬性,所有屬性能夠從ActiveMQ官網查看到。
策略實體中還包含一些子標籤用於設置一些具體的策略,如下列出部分子標籤:
消息分發策略;
死信隊列策略:當消息重複發送屢次仍未成功時,ActiveMQ將向死信隊列發送一個消息,死信隊列策略對該操做進行配置,好比設置使用共享的死信隊列或單獨的死信隊列;
消息撤銷策略:當消息消費較慢須要刪除消息時使用,好比撤銷最久的消息;
消息數量限制策略:用於設置持久化消息數量最大值;
訂閱恢復策略:當訂閱者從新鏈接時使用,好比只恢復最後一個消息;
持久訂閱這的消息引用策略,隊列的消息引用策略,訂閱者的消息引用策略:用於設置消息應用存儲位置,好比將消息引用存儲於內存中;
慢消費者策略:當消息消費較慢時使用,好比直接退出。
下面具體再介紹一下分發策略。
首先介紹一下預讀取機制,爲了提升消息消費速度,在一次消息接收過程當中,ActiveMQ經過預讀取機制將消息儘量多地推送給消費者,在消費者客戶端緩存。
但爲了防止消費者緩存溢出,ActiveMQ經過prefetchlimit控制當前推送給消費者且未收到確認的消息數量。
prefetch limit的默認值是:持久化消息隊列爲1000,非持久化消息隊列爲1000,持久化消息主題爲100,非持久化消息主題爲-1。
若prefetch limit=1,則消費者每次只會接收一個消息,至關於關閉預讀取機制。
若prefetchlimit=0,則消息只有在消費者主動拉取時纔會被接收,而不會被推送給消費者。
對於分發策略,有以下具體的策略,其中roundRobin策略是當有多個消費者時,將消息平均地發給各個消費者,而不是採用預讀取機制先將消息所有發往某個消費者直至達到prefetch limit。而strictOrder策略相反,是將消息先所有發往某個消費者,但該策略能夠保證當主題中有多個消息生產者,且有多個消息消費者時,每一個消息消費者接收到的消息順序是一致的。
流量控制是爲了防止消息生產較快,而消費較慢,致使隊列或主題堵塞。
在ActiveMQ5.0以前,ActiveMQ使用TCP協議自己的流量控制機制,這種方法的不足是隻能對整個鏈接進行流量控制,而不能對單個生產者進行流量控制,並且當多個消息生產者和消費者使用同一個鏈接時可能會形成死鎖。
從ActiveMQ5.0開始,ActiveMQ支持對單個生產者進行流量控制。流量控制可在生產者客戶端和服務器端進行配置。
若是生產者客戶端異步發送消息(useAsyncSend置爲true),發送消息時線程不會阻塞等待消息服務器返回確認,此時就須要在生產者客戶端配置流量控制,經過setProducerWindowSize設置一個最大值,即生產者發送的未接收到確認的消息不能超過該最大值,若超過,則等待。
服務器端的流量控制配置在兩個地方:
1)首先在目的地策略中,經過producerFlowControl能夠對每一個目的地設置是否進行流量控制,memoryLimit表示消息存儲在內存中的最大量,vmCursor表示在內存中僅保存消息的遊標,這樣能夠在內存中存儲儘量多的消息。
2)在存儲設置中,能夠對整個消息服務器的存儲用度進行配置,memoryUsage表示ActiveMQ使用的內存,storeUsage表示持久化消息存儲文件的大小,tempUsage表示非持久化消息存儲的文件大小。在存儲配置中還能夠設置當存儲用度不足時系統如何處理,除默認等待外還支持sendFailIfNoSpace,sendFailIfNoSpaceAfterTimeout。
ActiveMQ支持JMX,在ActiveMQ中配置JMX的示例以下所示。在啓動後,能夠經過Java工具jconsole鏈接ActiveMQ,對其進行監控。
對於消息持久化,消息以先進先出的方式存儲於隊列中。只有接收到消費者的確認,消息纔會從隊列中出隊。
對於具備持久訂閱者的主題,主題中只保存一份消息。每一個持久訂閱者保存一個指向最後一個接收消息的指針。只有接收到全部持久訂閱者對於消息的確認,消息纔會從主題中刪除。
下面將介紹4種消息存儲機制。
KahaDB是ActiveMQ推薦的消息存儲機制,它基於文件,是最快的一種消息存儲機制。
KahaDB的實現機制如圖所示,首先全部消息數據追加寫入log文件,log文件的大小有限制,若達到限制,則建立一個新文件,若log文件中的消息已所有發送出去,則該文件被刪除。隊列、主題使用B樹數據結構存儲,這樣可以快速查詢到其中的消息,B樹中的消息實際存儲對log文件中數據的引用。同時隊列、主題的消息還保存在緩存中,以提升訪問速度。
與KahaDB相似,基於文件,但與KahaDB不一樣的是,每一個隊列有獨立的索引文件,多用於消息量大的場景,但不適用於隊列多的場景。
在客戶端鏈接服務器或服務器之間互連時,ActiveMQ支持多種鏈接協議,如下是這些協議以及使用說明,其中,TCP是ActiveMQ默認使用的網絡協議,STOMP是一種面向簡單文本的消息協議,主要用於多語言支持,實踐中,咱們在PHP和Python的客戶端鏈接服務器時,使用了該協議,VM主要用於訪問在同一個JVM中運行的服務器。
在客戶端使用vm鏈接消息服務器時,如vm://brokerName,若同一個JVM內存在以該brokerName命名的消息服務器實例,則鏈接至該實例,若不存在,則建立一個以該brokerName命名的消息服務器實例,並鏈接。
在鏈接時採用的URI格式以下所示,第一種是單一URI,表示一個消息服務器鏈接地址,第二種是組合URI,即將多個消息服務器地址組合起來。
在消息服務器配置中,鏈接有兩種配置:
transportconnector,用於配置客戶端與服務器之間的鏈接,向客戶端提供鏈接端口,ActiveMQ一般佔有61616端口對外提供tcp鏈接;
networkconnector,用於配置服務器與服務器之間的鏈接,實現服務器網絡,下面將具體介紹ActiveMQ的兩種服務器網絡。
首先是靜態網絡。
在網絡中服務器配置已知的狀況下,可使用static建立靜態網絡,例如已知網絡中已有一消息服務器BrokerB,則在BrokerA的配置中能夠添加以上配置,在啓動BrokerA時,BrokerA會建立與BrokerB的鏈接,當生產者將消息發送至BrokerA,且消費者從BrokerB接收該消息時,BrokerA會自動將消息轉發至BrokerB。
在這種配置下,BrokerA至BrokerB的鏈接是單向的,即消息只能從BrokerA轉發至BrokerB。若是須要將鏈接配置成雙向的,能夠將duplex屬性置成true,這樣,BrokerA和BrokerB便可以向對方轉發消息,也能夠從對方接收消息。
靜態網絡須要已知服務配置狀況,且不易於進行後期擴展,經過使用動態網絡可解決以上問題。在動態網絡中,每一個消息服務器須要進行以上配置。在服務啓動後,會自動使用IP組播在網絡中尋找其餘消息服務器實例,並建立鏈接。
在客戶端鏈接消息服務器時,既可使用單一URI鏈接單個服務器,也可使用組合URI從多個服務器中選擇一個進行鏈接。
在組合URL中,failover是一種比較經常使用的客戶端鏈接方式,使用failover時,客戶端會從多個服務器地址中隨機選擇一個進行鏈接,當鏈接失效時,會嘗試鏈接其餘的服務器。
若是隻鏈接一個服務器,也建議在服務器地址前再加上failover,這樣能夠創建重連機制,提升系統健壯性。
discovery與failover相似,可是經過組播從動態網絡中查詢可用的服務器,並從中隨機選擇一個進行鏈接,當鏈接失效時,也會嘗試鏈接其餘的服務器。
peer與vm相似,在使用peer鏈接時,會自動在JVM內建立服務器,另外,還會在創建此服務器與網絡中同組服務器的鏈接。
peer的應用場景是客戶端與服務器常常會有鏈接失效發生,但又須要在鏈接失效時,客戶端仍能夠正常工做。使用peer,客戶端能夠在本地JVM內建立服務器並與其通訊,當與遠程服務器鏈接正常時,本地服務器會再與遠程服務器進行通訊。
fanout能夠向靜態網絡或動態網絡發送消息,網絡中的每一個服務器都會接收到消息。