在網上有人問,如何在activemq.xml裏面配置消息隊列的大小,這樣才保證隊列不會溢出!若是採用非持久化消息,那麼當大量發送失敗時候,首先大 量佔用內存。形成消息堆積,容易形成內存溢出,因此我的比較傾向於持久化消息的同時配合其餘方式的master/slave或者failover機制,盡 量保持消息的暢通。當咱們開發的Java的使用應用程序的時候,有的時候須要爲java應用指定對應的內存大小。當在測試的時候,有的同窗也許會遇到 ActiveMQ報內存不足的問題,咱們能夠修改ActiveMQ使用的內存的多少相似Java 的內存的設置,在ActiveMQ.bat或者activemq(linux)文件中html
以下:java
if "%ACTIVEMQ_BASE%" == "" set ACTIVEMQ_BASE=%ACTIVEMQ_HOME% if "%ACTIVEMQ_OPTS%" == "" set ACTIVEMQ_OPTS=-Xmx512M -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties if "%SUNJMX%" == "" set SUNJMX=-Dcom.sun.management.jmxremote
其中ACTIVEMQ_OPTS中指定ActiveMQ的使用的內存的大小。linux
下面爲ActiveMQ的內存分配信息。activemq.xml中的配置狀況:sql
<!-- The systemUsage controls the maximum amount of space the broker will use before slowing down producers. For more information, see: http://activemq.apache.org/producer-flow-control.html <systemUsage sendFailIfNoSpaceAfterTimeout="3000" sendFailIfNoSpace="true"> <systemUsage> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="1 gb"/> </storeUsage> <tempUsage> <tempUsage limit="100 mb"/> </tempUsage> </systemUsage> </systemUsage> >
memoryUsage:表示全部隊列對象佔用的內存大小爲20mb;apache
AbstractPendingMessageCursor計算空間的方法源代碼以下:緩存
public boolean hasSpace() { return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true; } public boolean isFull() { return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false; }
If you like, you can disable flow control for specific JMS queues and topics on the broker by setting the producerFlowControl
flag to false on the appropriate destination policy in the Broker configuration - e.g.app
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>" producerFlowControl="false"/> </policyEntries> </policyMap> </destinationPolicy>
ActiveMQ消息持久化
在broker中設置屬性persistent=」true」(默認是true),同時發送的消息也應該是persitent類型的。ActiveMQ消息持久化有三種方式:AMQ、KahaDB、JDBC。
AMQ是一種文件存儲形式,它具備寫入速度快和容易恢復的特色。消息存儲在一個個文件中,文件的默認大小爲32兆,若是一條消息 的大小超過了32兆,那麼這個值必須設置大點。當一個存儲文件中的消息已經所有被消費,那麼這個文件將被標識爲可刪除,在下一個清除階段,這個文件被刪 除。默認配置以下:
Java
1 2 3 |
<persistenceAdapter> <amqPersistenceAdapter directory="activemq-data"maxFileLength="32mb"/> </persistenceAdapter> |
AMQ的屬性:
屬性名稱 | 默認值 | 描述 |
directory | activemq-data | 消息文件和日誌的存儲目錄 |
useNIO | true | 使用NIO協議存儲消息 |
syncOnWrite | false | 同步寫到磁盤,這個選項對性能影響很是大 |
maxFileLength | 32mb | 一個消息文件的大小 |
persistentIndex | true | 消息索引的持久化,若是爲false,那麼索引保存在內存中 |
maxCheckpointMessageAddSize | 4kb | 一個事務容許的最大消息量 |
cleanupInterval | 30000 | 清除操做週期,單位ms |
indexBinSize | 1024 | 索引文件緩存頁面數,缺省爲1024,當amq擴充或者縮減存儲時,會鎖定整個broker,致使必定時間的阻塞,因此這個值應該調整到比較大,可是代碼中實現會動態伸縮,調整效果並不理想。 |
indexKeySize | 96 | 索引key的大小,key是消息ID |
indexPageSize | 16kb | 索引的頁大小 |
directoryArchive | archive | 存儲被歸檔的消息文件目錄 |
archiveDataLogs | false | 當爲true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除 |
KahaDB是基於文件的本地數據庫儲存形式,雖然沒有AMQ的速度快,可是它具備強擴展性,恢復的時間比AMQ短,從5.4版本以後KahaDB作爲默認的持久化方式。默認配置以下:
Java
1 2 3 |
<persistenceAdapter> <kahaDB directory="activemq-data"journalMaxFileLength="32mb"/> </persistenceAdapter> |
KahaDB的屬性:
property name | default value | Comments |
directory | activemq-data | 消息文件和日誌的存儲目錄 |
indexWriteBatchSize | 1000 | 一批索引的大小,當要更新的索引量到達這個值時,更新到消息文件中 |
indexCacheSize | 10000 | 內存中,索引的頁大小 |
enableIndexWriteAsync | false | 索引是否異步寫到消息文件中 |
journalMaxFileLength | 32mb | 一個消息文件的大小 |
enableJournalDiskSyncs | true | 是否講非事務的消息同步寫入到磁盤 |
cleanupInterval | 30000 | 清除操做週期,單位ms |
checkpointInterval | 5000 | 索引寫入到消息文件的週期,單位ms |
ignoreMissingJournalfiles | false | 忽略丟失的消息文件,false,當丟失了消息文件,啓動異常 |
checkForCorruptJournalFiles | false | 檢查消息文件是否損壞,true,檢查發現損壞會嘗試修復 |
checksumJournalFiles | false | 產生一個checksum,以便可以檢測journal文件是否損壞。 |
5.4版本以後有效的屬性: | ||
archiveDataLogs | false | 當爲true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除 |
directoryArchive | null | 存儲被歸檔的消息文件目錄 |
databaseLockedWaitDelay | 10000 | 在使用負載時,等待得到文件鎖的延遲時間,單位ms |
maxAsyncJobs | 10000 | 同個生產者產生等待寫入的異步消息最大量 |
concurrentStoreAndDispatchTopics | false | 當寫入消息的時候,是否轉發主題消息 |
concurrentStoreAndDispatchQueues | true | 當寫入消息的時候,是否轉發隊列消息 |
5.6版本以後有效的屬性: | ||
archiveCorruptedIndex | false | 是否歸檔錯誤的索引 |
從5.6版本以後,有可能發佈經過多個kahadb持久適配器來實現分佈式目標隊列存儲。何時用呢?若是有一個快速的生產者 和消費者,當某一個時刻生產者發生了不規範的消費,那麼有可能產生一條消息被存儲在兩個消息文件中,同時,有些目標隊列是危險的而且要求訪問磁盤。在這種 狀況下,你應該用通配符來使用mKahaDB。若是目標隊列是分佈的,事務是能夠跨越多個消息文件的。
每一個KahaDB的實例均可以配置單獨的適配器,若是沒有目標隊列提交給filteredKahaDB,那麼意味着對全部的隊列有效。若是一個隊列沒有對應的適配器,那麼將會拋出一個異常。配置以下:
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
<persistenceAdapter> <mKahaDB directory="${activemq.base}/data/kahadb"> <filteredPersistenceAdapters> <!--match all queues--> <filteredKahaDB queue=">"> <persistenceAdapter> <kahaDB journalMaxFileLength="32mb"/> </persistenceAdapter> </filteredKahaDB>
<!--match all destinations--> <filteredKahaDB> <persistenceAdapter> <kahaDB enableJournalDiskSyncs="false"/> </persistenceAdapter> </filteredKahaDB> </filteredPersistenceAdapters> </mKahaDB> </persistenceAdapter> |
若是filteredKahaDB的perDestination屬性設置爲true,那麼匹配的目標隊列將會獲得本身對應的KahaDB實例。配置以下:
Java
1 2 3 4 5 6 7 8 9 10 11 12 |
<persistenceAdapter> <mKahaDB directory="${activemq.base}/data/kahadb"> <filteredPersistenceAdapters> <!--kahaDB per destinations--> <filteredKahaDB perDestination="true"> <persistenceAdapter> <kahaDB journalMaxFileLength="32mb"/> </persistenceAdapter> </filteredKahaDB> </filteredPersistenceAdapters> </mKahaDB> </persistenceAdapter> |
配置JDBC適配器:
Java
1 2 3 |
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"createTablesOnStartup="false"/> </persistenceAdapter> |
dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啓動的時候建立數據表,默認值是true,這樣每次啓動都會去建立數據表了,通常是第一次啓動的時候設置爲true,以後改爲false。
MYSQL持久化bean
Java
1 2 3 4 5 6 7 |
<bean id="mysql-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close"> <propertyname="driverClassName"value="com.mysql.jdbc.Driver"/> <propertyname="url"value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <propertyname="username"value="activemq"/> <propertyname="password"value="activemq"/> <propertyname="poolPreparedStatements"value="true"/> </bean> |
SQL Server持久化bean
Java
1 2 3 4 5 6 7 |
<bean id="mssql-ds"class="net.sourceforge.jtds.jdbcx.JtdsDataSource"destroy-method="close"> <propertyname="serverName"value="SERVERNAME"/> <propertyname="portNumber"value="PORTNUMBER"/> <propertyname="databaseName"value="DATABASE NAME"/> <propertyname="user"value="USER"/> <propertyname="password"value="PASSWORD"/> </bean> |
Oracle持久化bean
Java
1 2 3 4 5 6 7 8 |
<bean id="oracle-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close"> <propertyname="driverClassName"value="oracle.jdbc.driver.OracleDriver"/> <propertyname="url"value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/> <propertyname="username"value="activemq"/> <propertyname="password"value="activemq"/> <propertyname="maxActive"value="200"/> <propertyname="poolPreparedStatements"value="true"/> </bean> |
DB2持久化bean
Java
1 2 3 4 5 6 7 8 |
<bean id="db2-ds"class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <propertyname="driverClassName"value="com.ibm.db2.jcc.DB2Driver"/> <propertyname="url"value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/> <propertyname="username"value="activemq"/> <propertyname="password"value="activemq"/> <propertyname="maxActive"value="200"/> <propertyname="poolPrepar |